未验证 提交 f1be3e99 编写于 作者: L liyuntao 提交者: GitHub

Merge branch 'master' into develop

......@@ -21,13 +21,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-grpc</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-grpc-define</artifactId>
<packaging>jar</packaging>
</project>
</project>
\ No newline at end of file
......@@ -16,16 +16,16 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.agent.grpc;
import org.skywalking.apm.collector.core.module.Module;
/**
* @author peng-yongsheng
*/
public class StreamModule extends Module {
public class AgentGRPCModule extends Module {
public static final String NAME = "stream";
public static final String NAME = "agent_gRPC";
@Override public String name() {
return NAME;
......
......@@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.agent.AgentModule
\ No newline at end of file
org.skywalking.apm.collector.agent.grpc.AgentGRPCModule
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent-grpc</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-grpc-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-grpc-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-grpc-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-naming-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -19,7 +19,6 @@
package org.skywalking.apm.collector.agent.grpc;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.handler.ApplicationRegisterServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.InstanceDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler;
......@@ -27,8 +26,7 @@ import org.skywalking.apm.collector.agent.grpc.handler.ServiceNameDiscoveryServi
import org.skywalking.apm.collector.agent.grpc.handler.TraceSegmentServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingListener;
import org.skywalking.apm.collector.agent.stream.AgentStreamSingleton;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
......@@ -39,10 +37,7 @@ import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.StreamModule;
/**
* @author peng-yongsheng
......@@ -58,7 +53,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public Class<? extends Module> module() {
return AgentModule.class;
return AgentGRPCModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
......@@ -70,7 +65,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
Integer port = (Integer)config.get(PORT);
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));
moduleRegisterService.register(AgentGRPCModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));
AgentGRPCNamingListener namingListener = new AgentGRPCNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
......@@ -82,7 +77,6 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);
AgentStreamSingleton.createInstanceIfAbsent(getManager());
addHandlers(gRPCServer);
}
......@@ -91,14 +85,14 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AgentStreamModule.NAME};
}
private void addHandlers(Server gRPCServer) {
gRPCServer.addHandler(new ApplicationRegisterServiceHandler(getManager()));
gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new JVMMetricsServiceHandler());
gRPCServer.addHandler(new JVMMetricsServiceHandler(getManager()));
gRPCServer.addHandler(new TraceSegmentServiceHandler(getManager()));
}
}
......@@ -20,10 +20,9 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
......@@ -39,10 +38,10 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServiceHandler.class);
private final ApplicationIDService applicationIDService;
private final IApplicationIDService applicationIDService;
public ApplicationRegisterServiceHandler(ModuleManager moduleManager) {
applicationIDService = new ApplicationIDService(moduleManager);
applicationIDService = moduleManager.find(AgentStreamModule.NAME).getService(IApplicationIDService.class);
}
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
......@@ -52,12 +51,7 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = 0;
try {
applicationId = applicationIDService.getOrCreate(applicationCode);
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
int applicationId = applicationIDService.getOrCreate(applicationCode);
if (applicationId != 0) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
......
......@@ -21,10 +21,9 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ApplicationInstance;
......@@ -43,21 +42,16 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
private final InstanceIDService instanceIDService;
private final IInstanceIDService instanceIDService;
public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
this.instanceIDService = new InstanceIDService(moduleManager);
this.instanceIDService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceIDService.class);
}
@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
int instanceId = 0;
try {
instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
......@@ -68,11 +62,7 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
try {
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.skywalking.apm.network.proto.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);
private final ICpuMetricService cpuMetricService;
private final IGCMetricService gcMetricService;
private final IMemoryMetricService memoryMetricService;
private final IMemoryPoolMetricService memoryPoolMetricService;
private final IInstanceHeartBeatService instanceHeartBeatService;
public JVMMetricsServiceHandler(ModuleManager moduleManager) {
this.cpuMetricService = moduleManager.find(AgentStreamModule.NAME).getService(ICpuMetricService.class);
this.gcMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IGCMetricService.class);
this.memoryMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IMemoryMetricService.class);
this.memoryPoolMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IMemoryPoolMetricService.class);
this.instanceHeartBeatService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceHeartBeatService.class);
}
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
int instanceId = request.getApplicationInstanceId();
logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToInstanceHeartBeatService(instanceId, metric.getTime());
sendToCpuMetricService(instanceId, time, metric.getCpu());
sendToMemoryMetricService(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricService(instanceId, time, metric.getMemoryPoolList());
sendToGCMetricService(instanceId, time, metric.getGcList());
});
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
private void sendToInstanceHeartBeatService(int instanceId, long heartBeatTime) {
instanceHeartBeatService.send(instanceId, heartBeatTime);
}
private void sendToMemoryMetricService(int instanceId, long timeBucket, List<Memory> memories) {
memories.forEach(memory -> memoryMetricService.send(instanceId, timeBucket, memory.getIsHeap(), memory.getInit(), memory.getMax(), memory.getUsed(), memory.getCommitted()));
}
private void sendToMemoryPoolMetricService(int instanceId, long timeBucket,
List<MemoryPool> memoryPools) {
memoryPools.forEach(memoryPool -> memoryPoolMetricService.send(instanceId, timeBucket, memoryPool.getType().getNumber(), memoryPool.getInit(), memoryPool.getMax(), memoryPool.getUsed(), memoryPool.getCommited()));
}
private void sendToCpuMetricService(int instanceId, long timeBucket, CPU cpu) {
cpuMetricService.send(instanceId, timeBucket, cpu.getUsagePercent());
}
private void sendToGCMetricService(int instanceId, long timeBucket, List<GC> gcs) {
gcs.forEach(gc -> gcMetricService.send(instanceId, timeBucket, gc.getPhraseValue(), gc.getCount(), gc.getTime()));
}
}
......@@ -20,7 +20,8 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ServiceNameCollection;
......@@ -38,10 +39,10 @@ public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServ
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private final ServiceNameService serviceNameService;
private final IServiceNameService serviceNameService;
public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceNameService = new ServiceNameService(moduleManager);
this.serviceNameService = moduleManager.find(AgentStreamModule.NAME).getService(IServiceNameService.class);
}
@Override public void discovery(ServiceNameCollection request,
......
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
......@@ -35,18 +36,17 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
private final ModuleManager moduleManager;
private final ITraceSegmentService traceSegmentService;
public TraceSegmentServiceHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.traceSegmentService = moduleManager.find(AgentStreamModule.NAME).getService(ITraceSegmentService.class);
}
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
logger.debug("receive segment");
SegmentParse segmentParse = new SegmentParse(moduleManager);
segmentParse.parse(segment, SegmentParse.Source.Agent);
traceSegmentService.send(segment);
}
@Override public void onError(Throwable throwable) {
......
......@@ -37,8 +37,7 @@ public class AgentGRPCNamingHandler extends JettyHandler {
}
@Override public String pathSpec() {
// return "/agent/gRPC";
return "/agentstream/grpc";
return "/agent/gRPC";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
......
......@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.grpc.handler.naming;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.AgentGRPCModule;
import org.skywalking.apm.collector.agent.grpc.AgentModuleGRPCProvider;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
......@@ -27,7 +27,7 @@ import org.skywalking.apm.collector.cluster.ClusterModuleListener;
*/
public class AgentGRPCNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + AgentModule.NAME + "/" + AgentModuleGRPCProvider.NAME;
public static final String PATH = "/" + AgentGRPCModule.NAME + "/" + AgentModuleGRPCProvider.NAME;
@Override public String path() {
return PATH;
......
......@@ -21,25 +21,24 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-stream</artifactId>
<packaging>jar</packaging>
<artifactId>apm-collector-agent-grpc</artifactId>
<packaging>pom</packaging>
<modules>
<module>collector-agent-grpc-define</module>
<module>collector-agent-grpc-provider</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-provider</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cache-define</artifactId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
......@@ -21,12 +21,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-stream</artifactId>
<artifactId>apm-collector-agent-jetty</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-stream-define</artifactId>
<artifactId>collector-agent-jetty-define</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
......@@ -16,16 +16,16 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent;
package org.skywalking.apm.collector.agent.jetty;
import org.skywalking.apm.collector.core.module.Module;
/**
* @author peng-yongsheng
*/
public class AgentModule extends Module {
public class AgentJettyModule extends Module {
public static final String NAME = "agent";
public static final String NAME = "agent_jetty";
@Override public String name() {
return NAME;
......
......@@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.stream.StreamModule
\ No newline at end of file
org.skywalking.apm.collector.agent.jetty.AgentJettyModule
\ No newline at end of file
......@@ -21,7 +21,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-jetty</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
......@@ -33,17 +33,27 @@
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-jetty-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream</artifactId>
<artifactId>collector-jetty-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-jetty-manager-define</artifactId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-naming-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
......@@ -19,11 +19,13 @@
package org.skywalking.apm.collector.agent.jetty;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.handler.ApplicationRegisterServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.InstanceDiscoveryServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.jetty.handler.TraceSegmentServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
......@@ -35,8 +37,6 @@ import org.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.StreamModule;
/**
* @author peng-yongsheng
......@@ -53,7 +53,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public Class<? extends Module> module() {
return AgentModule.class;
return AgentJettyModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
......@@ -66,7 +66,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
String contextPath = config.getProperty(CONTEXT_PATH);
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleJettyRegistration(host, port, contextPath));
moduleRegisterService.register(AgentJettyModule.NAME, this.name(), new AgentModuleJettyRegistration(host, port, contextPath));
AgentJettyNamingListener namingListener = new AgentJettyNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
......@@ -85,10 +85,13 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME, StreamModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, JettyManagerModule.NAME, AgentStreamModule.NAME};
}
private void addHandlers(Server jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler());
jettyServer.addHandler(new TraceSegmentServletHandler(getManager()));
jettyServer.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyServer.addHandler(new InstanceDiscoveryServletHandler(getManager()));
jettyServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class);
private final IApplicationIDService applicationIDService;
private Gson gson = new Gson();
private static final String APPLICATION_CODE = "c";
private static final String APPLICATION_ID = "i";
public ApplicationRegisterServletHandler(ModuleManager moduleManager) {
this.applicationIDService = moduleManager.find(AgentStreamModule.NAME).getService(IApplicationIDService.class);
}
@Override public String pathSpec() {
return "/application/register";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonArray responseArray = new JsonArray();
try {
JsonArray applicationCodes = gson.fromJson(req.getReader(), JsonArray.class);
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i).getAsString();
int applicationId = applicationIDService.getOrCreate(applicationCode);
JsonObject mapping = new JsonObject();
mapping.addProperty(APPLICATION_CODE, applicationCode);
mapping.addProperty(APPLICATION_ID, applicationId);
responseArray.add(mapping);
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseArray;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceDiscoveryServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class);
private final IInstanceIDService instanceIDService;
private final Gson gson = new Gson();
private static final String APPLICATION_ID = "ai";
private static final String AGENT_UUID = "au";
private static final String REGISTER_TIME = "rt";
private static final String INSTANCE_ID = "ii";
private static final String OS_INFO = "oi";
public InstanceDiscoveryServletHandler(ModuleManager moduleManager) {
this.instanceIDService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceIDService.class);
}
@Override public String pathSpec() {
return "/instance/register";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonObject responseJson = new JsonObject();
try {
JsonObject instance = gson.fromJson(req.getReader(), JsonObject.class);
int applicationId = instance.get(APPLICATION_ID).getAsInt();
String agentUUID = instance.get(AGENT_UUID).getAsString();
long registerTime = instance.get(REGISTER_TIME).getAsLong();
JsonObject osInfo = instance.get(OS_INFO).getAsJsonObject();
int instanceId = instanceIDService.getOrCreate(applicationId, agentUUID, registerTime, osInfo.toString());
responseJson.addProperty(APPLICATION_ID, applicationId);
responseJson.addProperty(INSTANCE_ID, instanceId);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseJson;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private final IServiceNameService serviceNameService;
private final Gson gson = new Gson();
private static final String APPLICATION_ID = "ai";
private static final String SERVICE_NAME = "sn";
private static final String SERVICE_ID = "si";
private static final String ELEMENT = "el";
public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceNameService = moduleManager.find(AgentStreamModule.NAME).getService(IServiceNameService.class);
}
@Override public String pathSpec() {
return "/servicename/discovery";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonArray responseArray = new JsonArray();
try {
JsonArray services = gson.fromJson(req.getReader(), JsonArray.class);
for (JsonElement service : services) {
int applicationId = service.getAsJsonObject().get(APPLICATION_ID).getAsInt();
String serviceName = service.getAsJsonObject().get(SERVICE_NAME).getAsString();
int serviceId = serviceNameService.getOrCreate(applicationId, serviceName);
if (serviceId != 0) {
JsonObject responseJson = new JsonObject();
responseJson.addProperty(SERVICE_ID, serviceId);
responseJson.add(ELEMENT, service);
responseArray.add(responseJson);
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseArray;
}
}
......@@ -25,7 +25,9 @@ import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegment;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegmentJsonReader;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
......@@ -38,6 +40,12 @@ public class TraceSegmentServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
private final ITraceSegmentService traceSegmentService;
public TraceSegmentServletHandler(ModuleManager moduleManager) {
this.traceSegmentService = moduleManager.find(AgentStreamModule.NAME).getService(ITraceSegmentService.class);
}
@Override public String pathSpec() {
return "/segments";
}
......@@ -64,9 +72,8 @@ public class TraceSegmentServletHandler extends JettyHandler {
reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse(null);
TraceSegment traceSegment = jsonReader.read(reader);
segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
traceSegmentService.send(traceSegment.getUpstreamSegment());
}
reader.endArray();
}
......
......@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.jetty.handler.naming;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.AgentJettyModule;
import org.skywalking.apm.collector.agent.jetty.AgentModuleJettyProvider;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
......@@ -27,7 +27,7 @@ import org.skywalking.apm.collector.cluster.ClusterModuleListener;
*/
public class AgentJettyNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + AgentModule.NAME + "/" + AgentModuleJettyProvider.NAME;
public static final String PATH = "/" + AgentJettyModule.NAME + "/" + AgentModuleJettyProvider.NAME;
@Override public String path() {
return PATH;
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterPost {
public void send(String jsonFile) throws IOException {
JsonElement application = JsonFileReader.INSTANCE.read(jsonFile);
HttpClientTools.INSTANCE.post("http://localhost:12800/application/register", application.toString());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public enum HttpClientTools {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(HttpClientTools.class);
public String get(String url, List<NameValuePair> params) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpGet httpget = new HttpGet(url);
String paramStr = EntityUtils.toString(new UrlEncodedFormEntity(params));
httpget.setURI(new URI(httpget.getURI().toString() + "?" + paramStr));
logger.debug("executing get request {}", httpget.getURI());
try (CloseableHttpResponse response = httpClient.execute(httpget)) {
HttpEntity entity = response.getEntity();
if (entity != null) {
return EntityUtils.toString(entity);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
httpClient.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
public String post(String url, String data) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpPost httppost = new HttpPost(url);
httppost.setEntity(new StringEntity(data, Consts.UTF_8));
logger.debug("executing post request {}", httppost.getURI());
try (CloseableHttpResponse response = httpClient.execute(httppost)) {
HttpEntity entity = response.getEntity();
if (entity != null) {
return EntityUtils.toString(entity);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
httpClient.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class InstanceRegisterPost {
public void send(String jsonFile) throws IOException {
JsonElement instance = JsonFileReader.INSTANCE.read(jsonFile);
HttpClientTools.INSTANCE.post("http://localhost:12800/instance/register", instance.toString());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.FileNotFoundException;
import java.io.FileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public enum JsonFileReader {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(JsonFileReader.class);
public JsonElement read(String fileName) throws FileNotFoundException {
String path = this.getClass().getClassLoader().getResource(fileName).getFile();
logger.debug("path: {}", path);
JsonParser jsonParser = new JsonParser();
return jsonParser.parse(new FileReader(path));
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class SegmentPost {
public static void main(String[] args) throws IOException {
ApplicationRegisterPost applicationRegisterPost = new ApplicationRegisterPost();
applicationRegisterPost.send("json/application-register-consumer.json");
applicationRegisterPost.send("json/application-register-provider.json");
InstanceRegisterPost instanceRegisterPost = new InstanceRegisterPost();
instanceRegisterPost.send("json/instance-register-consumer.json");
instanceRegisterPost.send("json/instance-register-provider.json");
ServiceNameRegisterPost serviceNameRegisterPost = new ServiceNameRegisterPost();
serviceNameRegisterPost.send("json/servicename-register-consumer.json");
serviceNameRegisterPost.send("json/servicename-register-provider.json");
JsonElement provider = JsonFileReader.INSTANCE.read("json/dubbox-provider.json");
JsonElement consumer = JsonFileReader.INSTANCE.read("json/dubbox-consumer.json");
for (int i = 0; i < 1000; i++) {
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString());
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class ServiceNameRegisterPost {
public void send(String jsonFile) throws IOException {
JsonElement instance = JsonFileReader.INSTANCE.read(jsonFile);
HttpClientTools.INSTANCE.post("http://localhost:12800/servicename/discovery", instance.toString());
}
}
[
{
"gt": [
[
230150,
185809,
24040000
]
],
"sg": {
"ts": [
230150,
185809,
24040000
],
"ai": -1,
"ii": 1,
"rs": [],
"ss": [
{
"si": 1,
"tv": 1,
"lv": 1,
"ps": 0,
"st": 1501858094526,
"et": 1501858097004,
"ci": 3,
"cn": "",
"oi": 0,
"on": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()",
"pi": 0,
"pn": "172.25.0.4:20880",
"ie": false,
"to": [
{
"k": "url",
"v": "rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
}
],
"lo": []
},
{
"si": 0,
"tv": 0,
"lv": 2,
"ps": -1,
"st": 1501858092409,
"et": 1501858097033,
"ci": 1,
"cn": "",
"oi": 0,
"on": "/dubbox-case/case/dubbox-rest",
"pi": 0,
"pn": "",
"ie": false,
"to": [
{
"k": "url",
"v": "http://localhost:18080/dubbox-case/case/dubbox-rest"
},
{
"k": "http.method",
"v": "GET"
}
],
"lo": []
}
]
}
}
]
\ No newline at end of file
[
{
"gt": [
[
230150,
185809,
24040000
]
],
"sg": {
"ts": [
137150,
185809,
48780000
],
"ai": 2,
"ii": 2,
"rs": [
{
"ts": [
230150,
185809,
24040000
],
"ai": -1,
"si": 1,
"vi": 0,
"vn": "/dubbox-case/case/dubbox-rest",
"ni": 0,
"nn": "172.25.0.4:20880",
"ea": 2,
"ei": 0,
"en": "/dubbox-case/case/dubbox-rest",
"rn": 0
}
],
"ss": [
{
"si": 0,
"tv": 0,
"lv": 2,
"ps": -1,
"st": 1501858094726,
"et": 1501858096804,
"ci": 3,
"cn": "",
"oi": 0,
"on": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()",
"pi": 0,
"pn": "",
"ie": false,
"to": [
{
"k": "url",
"v": "rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
},
{
"k": "http.method",
"v": "GET"
}
],
"lo": []
}
]
}
}
]
\ No newline at end of file
{
"ai": -1,
"au": "dubbox-consumer",
"rt": 1501858094526,
"oi": {
"any_name": "any_value",
"any_name1": "any_value1"
}
}
\ No newline at end of file
{
"ai": 2,
"au": "dubbox-provider",
"rt": 1501858094526,
"oi": {
"any_name": "any_value",
"any_name1": "any_value1"
}
}
\ No newline at end of file
[
{
"ai": -1,
"sn": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
}
]
\ No newline at end of file
[
{
"ai": 2,
"sn": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
}
]
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-agent-jetty</artifactId>
<packaging>pom</packaging>
<modules>
<module>collector-agent-jetty-define</module>
<module>collector-agent-jetty-provider</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -21,20 +21,20 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-stream</artifactId>
<artifactId>apm-collector-agent-stream</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-stream-provider</artifactId>
<artifactId>collector-agent-stream-define</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-define</artifactId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
......@@ -16,37 +16,53 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.agent.stream;
import java.util.Properties;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
/**
* @author peng-yongsheng
*/
public class StreamModuleProvider extends ModuleProvider {
public class AgentStreamModule extends Module {
public static final String NAME = "agent_stream";
@Override public String name() {
return "worker";
return NAME;
}
@Override public Class<? extends Module> module() {
return StreamModule.class;
}
@Override public Class[] services() {
List<Class> classes = new ArrayList<>();
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
}
addRegisterService(classes);
addJVMService(classes);
classes.add(ITraceSegmentService.class);
@Override public void start(Properties config) throws ServiceNotProvidedException {
return classes.toArray(new Class[] {});
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
private void addRegisterService(List<Class> classes) {
classes.add(IApplicationIDService.class);
classes.add(IInstanceIDService.class);
classes.add(IServiceNameService.class);
}
@Override public String[] requiredModules() {
return new String[] {};
private void addJVMService(List<Class> classes) {
classes.add(ICpuMetricService.class);
classes.add(IGCMetricService.class);
classes.add(IMemoryMetricService.class);
classes.add(IMemoryPoolMetricService.class);
classes.add(IInstanceHeartBeatService.class);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface ICpuMetricService extends Service {
void send(int instanceId, long timeBucket, double usagePercent);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IGCMetricService extends Service {
void send(int instanceId, long timeBucket, int phraseValue, long count, long time);
}
......@@ -16,13 +16,13 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.buffer;
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.config.SystemConfig;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public class SegmentBufferConfig {
public static String BUFFER_PATH = SystemConfig.DATA_PATH + "/buffer/";
public interface IInstanceHeartBeatService extends Service {
void send(int instanceId, long heartBeatTime);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IMemoryMetricService extends Service {
void send(int instanceId, long timeBucket, boolean isHeap, long init, long max, long used, long commited);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IMemoryPoolMetricService extends Service {
void send(int instanceId, long timeBucket, int poolType, long init, long max, long used, long commited);
}
......@@ -16,13 +16,13 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.service;
package org.skywalking.apm.collector.agent.stream.service.register;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface RemoteServerService extends Service {
void registerReceiver(DataReceiver receiver);
public interface IApplicationIDService extends Service {
int getOrCreate(String applicationCode);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.register;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IInstanceIDService extends Service {
int getOrCreate(int applicationId, String agentUUID, long registerTime, String osInfo);
void recover(int instanceId, int applicationId, long registerTime, String osInfo);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.register;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IServiceNameService extends Service {
int getOrCreate(int applicationId, String serviceName);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.trace;
import org.skywalking.apm.collector.core.module.Service;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author peng-yongsheng
*/
public interface ITraceSegmentService extends Service {
void send(UpstreamSegment segment);
}
......@@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.stream.StreamModuleProvider
\ No newline at end of file
org.skywalking.apm.collector.agent.stream.AgentStreamModule
\ No newline at end of file
......@@ -21,30 +21,35 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-stream</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-grpc-provider</artifactId>
<artifactId>collector-agent-stream-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream</artifactId>
<artifactId>collector-storage-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-grpc-manager-define</artifactId>
<artifactId>collector-cache-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-stream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
......@@ -22,34 +22,23 @@ import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/**
* @author peng-yongsheng
*/
public class AgentStreamSingleton {
private static AgentStreamSingleton INSTANCE;
public class AgentStreamBootStartup {
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;
private AgentStreamSingleton(ModuleManager moduleManager) {
public AgentStreamBootStartup(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.workerCreateListener = new WorkerCreateListener();
this.create();
}
public static synchronized AgentStreamSingleton createInstanceIfAbsent(ModuleManager moduleManager) {
if (ObjectUtils.isEmpty(INSTANCE)) {
INSTANCE = new AgentStreamSingleton(moduleManager);
}
return INSTANCE;
}
private void create() {
public void start() {
createJVMGraph();
createRegisterGraph();
createTraceGraph();
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream;
import java.util.Properties;
import org.skywalking.apm.collector.agent.stream.buffer.BufferFileConfig;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.agent.stream.worker.AgentStreamRemoteDataRegister;
import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricService;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.agent.stream.worker.trace.TraceSegmentService;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.skywalking.apm.collector.storage.StorageModule;
/**
* @author peng-yongsheng
*/
public class AgentStreamModuleProvider extends ModuleProvider {
@Override public String name() {
return "default";
}
@Override public Class<? extends Module> module() {
return AgentStreamModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(IApplicationIDService.class, new ApplicationIDService(getManager()));
this.registerServiceImplementation(IInstanceIDService.class, new InstanceIDService(getManager()));
this.registerServiceImplementation(IServiceNameService.class, new ServiceNameService(getManager()));
this.registerServiceImplementation(ICpuMetricService.class, new CpuMetricService());
this.registerServiceImplementation(IGCMetricService.class, new GCMetricService());
this.registerServiceImplementation(IMemoryMetricService.class, new MemoryMetricService());
this.registerServiceImplementation(IMemoryPoolMetricService.class, new MemoryPoolMetricService());
this.registerServiceImplementation(IInstanceHeartBeatService.class, new InstanceHeartBeatService());
this.registerServiceImplementation(ITraceSegmentService.class, new TraceSegmentService(getManager()));
BufferFileConfig.Parser parser = new BufferFileConfig.Parser();
parser.parse(config);
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
RemoteDataRegisterService remoteDataRegisterService = getManager().find(RemoteModule.NAME).getService(RemoteDataRegisterService.class);
AgentStreamRemoteDataRegister agentStreamRemoteDataRegister = new AgentStreamRemoteDataRegister(remoteDataRegisterService);
agentStreamRemoteDataRegister.register();
AgentStreamBootStartup bootStartup = new AgentStreamBootStartup(getManager());
bootStartup.start();
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[] {StorageModule.NAME, CacheModule.NAME};
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.buffer;
import java.util.Properties;
/**
* @author peng-yongsheng
*/
public class BufferFileConfig {
static int BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
static int BUFFER_SEGMENT_MAX_FILE_SIZE = 10 * 1024 * 1024;
static String BUFFER_PATH = "../buffer/";
private static final String BUFFER_PATH_KEY = "buffer_file_path";
private static final String BUFFER_OFFSET_MAX_FILE_SIZE_KEY = "buffer_offset_max_file_size";
private static final String BUFFER_SEGMENT_MAX_FILE_SIZE_KEY = "buffer_segment_max_file_size";
public static class Parser {
public void parse(Properties config) {
if (config.containsKey(BUFFER_PATH_KEY)) {
BUFFER_PATH = config.getProperty(BUFFER_PATH_KEY);
}
if (config.containsKey(BUFFER_OFFSET_MAX_FILE_SIZE_KEY)) {
String sizeStr = config.getProperty(BUFFER_OFFSET_MAX_FILE_SIZE_KEY).toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("KB")) {
int size = Integer.parseInt(sizeStr.replace("KB", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("M")) {
int size = Integer.parseInt(sizeStr.replace("M", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024 * 1024;
} else if (sizeStr.endsWith("MB")) {
int size = Integer.parseInt(sizeStr.replace("MB", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024 * 1024;
} else {
BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
} else {
BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
if (config.containsKey(BUFFER_SEGMENT_MAX_FILE_SIZE_KEY)) {
String sizeStr = config.getProperty(BUFFER_SEGMENT_MAX_FILE_SIZE_KEY).toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("KB")) {
int size = Integer.parseInt(sizeStr.replace("KB", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("M")) {
int size = Integer.parseInt(sizeStr.replace("M", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024 * 1024;
} else if (sizeStr.endsWith("MB")) {
int size = Integer.parseInt(sizeStr.replace("MB", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024 * 1024;
} else {
BUFFER_SEGMENT_MAX_FILE_SIZE = 1024 * 1024;
}
} else {
BUFFER_SEGMENT_MAX_FILE_SIZE = 1024 * 1024;
}
}
}
}
......@@ -49,7 +49,7 @@ public enum OffsetManager {
public synchronized void initialize() throws IOException {
if (!initialized) {
this.offset = new Offset();
File dataPath = new File(SegmentBufferConfig.BUFFER_PATH);
File dataPath = new File(BufferFileConfig.BUFFER_PATH);
if (dataPath.mkdirs()) {
createOffsetFile();
} else {
......@@ -70,14 +70,14 @@ public enum OffsetManager {
offset.deserialize(offsetRecord);
initialized = true;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> flush(), 10, 3, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flush, 10, 3, TimeUnit.SECONDS);
}
}
private void createOffsetFile() throws IOException {
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
offsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
offsetFile = new File(BufferFileConfig.BUFFER_PATH + offsetFileName);
this.offset.getWriteOffset().setWriteFileName(Const.EMPTY_STRING);
this.offset.getWriteOffset().setWriteFileOffset(0);
this.offset.getReadOffset().setReadFileName(Const.EMPTY_STRING);
......@@ -99,7 +99,7 @@ public enum OffsetManager {
private void nextFile() {
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File newOffsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
File newOffsetFile = new File(BufferFileConfig.BUFFER_PATH + offsetFileName);
offsetFile.delete();
offsetFile = newOffsetFile;
this.flush();
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.buffer;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -39,18 +40,18 @@ public enum SegmentBufferManager {
public static final String DATA_FILE_PREFIX = "data";
private FileOutputStream outputStream;
public synchronized void initialize() {
public synchronized void initialize(ModuleManager moduleManager) {
logger.info("segment buffer initialize");
try {
OffsetManager.INSTANCE.initialize();
if (new File(SegmentBufferConfig.BUFFER_PATH).mkdirs()) {
if (new File(BufferFileConfig.BUFFER_PATH).mkdirs()) {
newDataFile();
} else {
String writeFileName = OffsetManager.INSTANCE.getWriteFileName();
if (StringUtils.isNotEmpty(writeFileName)) {
File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
File dataFile = new File(BufferFileConfig.BUFFER_PATH + writeFileName);
if (dataFile.exists()) {
outputStream = new FileOutputStream(new File(SegmentBufferConfig.BUFFER_PATH + writeFileName), true);
outputStream = new FileOutputStream(new File(BufferFileConfig.BUFFER_PATH + writeFileName), true);
} else {
newDataFile();
}
......@@ -58,7 +59,7 @@ public enum SegmentBufferManager {
newDataFile();
}
}
SegmentBufferReader.INSTANCE.initialize();
SegmentBufferReader.INSTANCE.initialize(moduleManager);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
......@@ -82,7 +83,7 @@ public enum SegmentBufferManager {
logger.debug("create new segment buffer file");
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String writeFileName = DATA_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
File dataFile = new File(BufferFileConfig.BUFFER_PATH + writeFileName);
dataFile.createNewFile();
OffsetManager.INSTANCE.setWriteOffset(writeFileName, 0);
try {
......@@ -97,6 +98,5 @@ public enum SegmentBufferManager {
}
public synchronized void flush() {
}
}
......@@ -27,6 +27,7 @@ import java.io.InputStream;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
......@@ -42,15 +43,17 @@ public enum SegmentBufferReader {
private final Logger logger = LoggerFactory.getLogger(SegmentBufferReader.class);
private InputStream inputStream;
private ModuleManager moduleManager;
public void initialize() {
public void initialize(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS);
}
private void preRead() {
String readFileName = OffsetManager.INSTANCE.getReadFileName();
if (StringUtils.isNotEmpty(readFileName)) {
File readFile = new File(SegmentBufferConfig.BUFFER_PATH + readFileName);
File readFile = new File(BufferFileConfig.BUFFER_PATH + readFileName);
if (readFile.exists()) {
deleteTheDataFilesBeforeReadFile(readFileName);
long readFileOffset = OffsetManager.INSTANCE.getReadFileOffset();
......@@ -66,7 +69,7 @@ public enum SegmentBufferReader {
}
private void deleteTheDataFilesBeforeReadFile(String readFileName) {
File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
File[] dataFiles = new File(BufferFileConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
long readFileCreateTime = getFileCreateTime(readFileName);
for (File dataFile : dataFiles) {
......@@ -87,7 +90,7 @@ public enum SegmentBufferReader {
private void readEarliestCreateDataFile() {
String readFileName = OffsetManager.INSTANCE.getReadFileName();
File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
File[] dataFiles = new File(BufferFileConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
if (CollectionUtils.isNotEmpty(dataFiles)) {
if (dataFiles[0].getName().equals(readFileName)) {
......@@ -117,7 +120,7 @@ public enum SegmentBufferReader {
while (readFile.length() > readFileOffset && readFileOffset < endPoint) {
UpstreamSegment upstreamSegment = UpstreamSegment.parser().parseDelimitedFrom(inputStream);
SegmentParse parse = new SegmentParse(null);
SegmentParse parse = new SegmentParse(moduleManager);
if (!parse.parse(upstreamSegment, SegmentParse.Source.Buffer)) {
return false;
}
......
......@@ -163,7 +163,7 @@ public class SegmentParse {
}
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
logger.debug("send to segment buffer write worker, id: {}", id);
logger.debug("push to segment buffer write worker, id: {}", id);
SegmentStandardization standardization = new SegmentStandardization(id);
standardization.setUpstreamSegment(upstreamSegment);
Graph<SegmentStandardization> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class);
......
......@@ -18,9 +18,9 @@
package org.skywalking.apm.collector.agent.stream.parser.standardization;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
......@@ -36,9 +36,9 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
private final Logger logger = LoggerFactory.getLogger(ReferenceIdExchanger.class);
private static ReferenceIdExchanger EXCHANGER;
private ServiceNameService serviceNameService;
private final ServiceNameService serviceNameService;
private final ApplicationIDService applicationIDService;
private final InstanceCacheService instanceCacheService;
private final ApplicationCacheService applicationCacheService;
public static ReferenceIdExchanger getInstance(ModuleManager moduleManager) {
if (EXCHANGER == null) {
......@@ -48,9 +48,9 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
private ReferenceIdExchanger(ModuleManager moduleManager) {
applicationIDService = new ApplicationIDService(moduleManager);
serviceNameService = new ServiceNameService(moduleManager);
instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
@Override public boolean exchange(ReferenceDecorator standardBuilder, int applicationId) {
......@@ -58,6 +58,10 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
int entryServiceId = serviceNameService.getOrCreate(instanceCacheService.get(standardBuilder.getEntryApplicationInstanceId()), standardBuilder.getEntryServiceName());
if (entryServiceId == 0) {
if (logger.isDebugEnabled()) {
int entryApplicationId = instanceCacheService.get(standardBuilder.getEntryApplicationInstanceId());
logger.debug("entry service name: {} from application id: {} exchange failed", standardBuilder.getEntryServiceName(), entryApplicationId);
}
return false;
} else {
standardBuilder.toBuilder();
......@@ -70,6 +74,10 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
int parentServiceId = serviceNameService.getOrCreate(instanceCacheService.get(standardBuilder.getParentApplicationInstanceId()), standardBuilder.getParentServiceName());
if (parentServiceId == 0) {
if (logger.isDebugEnabled()) {
int parentApplicationId = instanceCacheService.get(standardBuilder.getParentApplicationInstanceId());
logger.debug("parent service name: {} from application id: {} exchange failed", standardBuilder.getParentServiceName(), parentApplicationId);
}
return false;
} else {
standardBuilder.toBuilder();
......@@ -79,8 +87,11 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
if (standardBuilder.getNetworkAddressId() == 0 && StringUtils.isNotEmpty(standardBuilder.getNetworkAddress())) {
int networkAddressId = applicationCacheService.get(standardBuilder.getNetworkAddress());
int networkAddressId = applicationIDService.getOrCreate(standardBuilder.getNetworkAddress());
if (networkAddressId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("network address: {} from application id: {} exchange failed", standardBuilder.getNetworkAddress(), applicationId);
}
return false;
} else {
standardBuilder.toBuilder();
......
......@@ -38,7 +38,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Segme
public SegmentStandardizationWorker(ModuleManager moduleManager) {
super(moduleManager);
SegmentBufferManager.INSTANCE.initialize();
SegmentBufferManager.INSTANCE.initialize(moduleManager);
}
@Override public int id() {
......
......@@ -18,9 +18,8 @@
package org.skywalking.apm.collector.agent.stream.parser.standardization;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
......@@ -35,8 +34,8 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
private final Logger logger = LoggerFactory.getLogger(SpanIdExchanger.class);
private static SpanIdExchanger EXCHANGER;
private final ApplicationIDService applicationIDService;
private final ServiceNameService serviceNameService;
private final ApplicationCacheService applicationCacheService;
public static SpanIdExchanger getInstance(ModuleManager moduleManager) {
if (EXCHANGER == null) {
......@@ -45,15 +44,16 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
return EXCHANGER;
}
public SpanIdExchanger(ModuleManager moduleManager) {
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
private SpanIdExchanger(ModuleManager moduleManager) {
this.applicationIDService = new ApplicationIDService(moduleManager);
this.serviceNameService = new ServiceNameService(moduleManager);
}
@Override public boolean exchange(SpanDecorator standardBuilder, int applicationId) {
if (standardBuilder.getPeerId() == 0 && StringUtils.isNotEmpty(standardBuilder.getPeer())) {
int peerId = applicationCacheService.get(standardBuilder.getPeer());
int peerId = applicationIDService.getOrCreate(standardBuilder.getPeer());
if (peerId == 0) {
logger.debug("peer: {} in application: {} exchange failed", standardBuilder.getPeer(), applicationId);
return false;
} else {
standardBuilder.toBuilder();
......@@ -66,6 +66,7 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
int operationNameId = serviceNameService.getOrCreate(applicationId, standardBuilder.getOperationName());
if (operationNameId == 0) {
logger.debug("service name: {} from application id: {} exchange failed", standardBuilder.getOperationName(), applicationId);
return false;
} else {
standardBuilder.toBuilder();
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker;
import org.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
/**
* @author peng-yongsheng
*/
public class AgentStreamRemoteDataRegister {
private final RemoteDataRegisterService remoteDataRegisterService;
public AgentStreamRemoteDataRegister(RemoteDataRegisterService remoteDataRegisterService) {
this.remoteDataRegisterService = remoteDataRegisterService;
}
public void register() {
remoteDataRegisterService.register(Application.class, Application::new);
remoteDataRegisterService.register(Instance.class, Instance::new);
remoteDataRegisterService.register(ServiceName.class, ServiceName::new);
remoteDataRegisterService.register(NodeComponent.class, NodeComponent::new);
remoteDataRegisterService.register(NodeMapping.class, NodeMapping::new);
remoteDataRegisterService.register(NodeReference.class, NodeReference::new);
remoteDataRegisterService.register(ServiceEntry.class, ServiceEntry::new);
remoteDataRegisterService.register(ServiceReference.class, ServiceReference::new);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class CpuMetricService implements ICpuMetricService {
private final Logger logger = LoggerFactory.getLogger(CpuMetricService.class);
private Graph<CpuMetric> cpuMetricGraph;
private Graph<CpuMetric> getCpuMetricGraph() {
if (ObjectUtils.isEmpty(cpuMetricGraph)) {
cpuMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.CPU_METRIC_GRAPH_ID, CpuMetric.class);
}
return cpuMetricGraph;
}
@Override public void send(int instanceId, long timeBucket, double usagePercent) {
CpuMetric cpuMetric = new CpuMetric(timeBucket + Const.ID_SPLIT + instanceId);
cpuMetric.setInstanceId(instanceId);
cpuMetric.setUsagePercent(usagePercent);
cpuMetric.setTimeBucket(timeBucket);
logger.debug("push to cpu metric graph, id: {}", cpuMetric.getId());
getCpuMetricGraph().start(cpuMetric);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GCMetricService implements IGCMetricService {
private final Logger logger = LoggerFactory.getLogger(GCMetricService.class);
private Graph<GCMetric> gcMetricGraph;
private Graph<GCMetric> getGcMetricGraph() {
if (ObjectUtils.isEmpty(gcMetricGraph)) {
gcMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.GC_METRIC_GRAPH_ID, GCMetric.class);
}
return gcMetricGraph;
}
@Override public void send(int instanceId, long timeBucket, int phraseValue, long count, long time) {
GCMetric gcMetric = new GCMetric(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(phraseValue));
gcMetric.setInstanceId(instanceId);
gcMetric.setPhrase(phraseValue);
gcMetric.setCount(count);
gcMetric.setTime(time);
gcMetric.setTimeBucket(timeBucket);
logger.debug("push to gc metric graph, id: {}", gcMetric.getId());
getGcMetricGraph().start(gcMetric);
}
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册