diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandler.java index d6f6649a850d5df3f461620ac4a8b13e23900612..1008f22c8d38df5214f159e043634c448655fba3 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandler.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandler.java @@ -22,8 +22,6 @@ import com.google.protobuf.ProtocolStringList; import io.grpc.stub.StreamObserver; import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService; import org.skywalking.apm.collector.core.module.ModuleManager; -import org.skywalking.apm.collector.core.module.ModuleNotFoundException; -import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.skywalking.apm.network.proto.Application; import org.skywalking.apm.network.proto.ApplicationMapping; @@ -52,12 +50,7 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic ApplicationMapping.Builder builder = ApplicationMapping.newBuilder(); for (int i = 0; i < applicationCodes.size(); i++) { String applicationCode = applicationCodes.get(i); - int applicationId = 0; - try { - applicationId = applicationIDService.getOrCreate(applicationCode); - } catch (ModuleNotFoundException | ServiceNotProvidedException e) { - logger.error(e.getMessage(), e); - } + int applicationId = applicationIDService.getOrCreate(applicationCode); if (applicationId != 0) { KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build(); diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java index 68ac58805317220015f8dd829e932df09e664bb1..75d911b443ff232cd29df8b9d3563ca09b305249 100644 --- a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java @@ -20,6 +20,9 @@ package org.skywalking.apm.collector.agent.jetty; import java.util.Properties; import org.skywalking.apm.collector.agent.AgentModule; +import org.skywalking.apm.collector.agent.jetty.handler.ApplicationRegisterServletHandler; +import org.skywalking.apm.collector.agent.jetty.handler.InstanceDiscoveryServletHandler; +import org.skywalking.apm.collector.agent.jetty.handler.ServiceNameDiscoveryServiceHandler; import org.skywalking.apm.collector.agent.jetty.handler.TraceSegmentServletHandler; import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler; import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener; @@ -89,6 +92,9 @@ public class AgentModuleJettyProvider extends ModuleProvider { } private void addHandlers(Server jettyServer) { - jettyServer.addHandler(new TraceSegmentServletHandler()); + jettyServer.addHandler(new TraceSegmentServletHandler(getManager())); + jettyServer.addHandler(new ApplicationRegisterServletHandler(getManager())); + jettyServer.addHandler(new InstanceDiscoveryServletHandler(getManager())); + jettyServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager())); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ApplicationRegisterServletHandler.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ApplicationRegisterServletHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..5fb5db4b6a743af9841c91126e6a92e5a893dee3 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ApplicationRegisterServletHandler.java @@ -0,0 +1,75 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.jetty.handler; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; +import org.skywalking.apm.collector.server.jetty.JettyHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class ApplicationRegisterServletHandler extends JettyHandler { + + private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class); + + private final ApplicationIDService applicationIDService; + private Gson gson = new Gson(); + private static final String APPLICATION_CODE = "c"; + private static final String APPLICATION_ID = "i"; + + public ApplicationRegisterServletHandler(ModuleManager moduleManager) { + this.applicationIDService = new ApplicationIDService(moduleManager); + } + + @Override public String pathSpec() { + return "/application/register"; + } + + @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { + throw new UnsupportedOperationException(); + } + + @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { + JsonArray responseArray = new JsonArray(); + try { + JsonArray applicationCodes = gson.fromJson(req.getReader(), JsonArray.class); + for (int i = 0; i < applicationCodes.size(); i++) { + String applicationCode = applicationCodes.get(i).getAsString(); + int applicationId = applicationIDService.getOrCreate(applicationCode); + JsonObject mapping = new JsonObject(); + mapping.addProperty(APPLICATION_CODE, applicationCode); + mapping.addProperty(APPLICATION_ID, applicationId); + responseArray.add(mapping); + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return responseArray; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/InstanceDiscoveryServletHandler.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/InstanceDiscoveryServletHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..658d7d419d7ec5957e1904857faf5f9e8784a087 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/InstanceDiscoveryServletHandler.java @@ -0,0 +1,78 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.jetty.handler; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; +import org.skywalking.apm.collector.server.jetty.JettyHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class InstanceDiscoveryServletHandler extends JettyHandler { + + private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class); + + private final InstanceIDService instanceIDService; + private Gson gson = new Gson(); + + private static final String APPLICATION_ID = "ai"; + private static final String AGENT_UUID = "au"; + private static final String REGISTER_TIME = "rt"; + private static final String INSTANCE_ID = "ii"; + private static final String OS_INFO = "oi"; + + public InstanceDiscoveryServletHandler(ModuleManager moduleManager) { + this.instanceIDService = new InstanceIDService(moduleManager); + } + + @Override public String pathSpec() { + return "/instance/register"; + } + + @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { + throw new UnsupportedOperationException(); + } + + @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { + JsonObject responseJson = new JsonObject(); + try { + JsonObject instance = gson.fromJson(req.getReader(), JsonObject.class); + int applicationId = instance.get(APPLICATION_ID).getAsInt(); + String agentUUID = instance.get(AGENT_UUID).getAsString(); + long registerTime = instance.get(REGISTER_TIME).getAsLong(); + JsonObject osInfo = instance.get(OS_INFO).getAsJsonObject(); + + int instanceId = instanceIDService.getOrCreate(applicationId, agentUUID, registerTime, osInfo.toString()); + responseJson.addProperty(APPLICATION_ID, applicationId); + responseJson.addProperty(INSTANCE_ID, instanceId); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return responseJson; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ServiceNameDiscoveryServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ServiceNameDiscoveryServiceHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..b842e17c9f0ef9b3e8e24efaeb626b91e1d91c6b --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ServiceNameDiscoveryServiceHandler.java @@ -0,0 +1,82 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.jetty.handler; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; +import org.skywalking.apm.collector.server.jetty.JettyHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class ServiceNameDiscoveryServiceHandler extends JettyHandler { + + private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class); + + private final ServiceNameService serviceNameService; + private Gson gson = new Gson(); + + private static final String APPLICATION_ID = "ai"; + private static final String SERVICE_NAME = "sn"; + private static final String SERVICE_ID = "si"; + private static final String ELEMENT = "el"; + + public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) { + this.serviceNameService = new ServiceNameService(moduleManager); + } + + @Override public String pathSpec() { + return "/servicename/discovery"; + } + + @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { + throw new UnsupportedOperationException(); + } + + @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { + JsonArray responseArray = new JsonArray(); + try { + JsonArray services = gson.fromJson(req.getReader(), JsonArray.class); + for (JsonElement service : services) { + int applicationId = service.getAsJsonObject().get(APPLICATION_ID).getAsInt(); + String serviceName = service.getAsJsonObject().get(SERVICE_NAME).getAsString(); + + int serviceId = serviceNameService.getOrCreate(applicationId, serviceName); + if (serviceId != 0) { + JsonObject responseJson = new JsonObject(); + responseJson.addProperty(SERVICE_ID, serviceId); + responseJson.add(ELEMENT, service); + responseArray.add(responseJson); + } + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return responseArray; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/TraceSegmentServletHandler.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/TraceSegmentServletHandler.java index dd3ac8ed29434e2d1f16c78cf6b336b3826a316f..6c873bd77cfa9ef60608e1ac5732e403a7b8324e 100644 --- a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/TraceSegmentServletHandler.java +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/TraceSegmentServletHandler.java @@ -26,6 +26,7 @@ import javax.servlet.http.HttpServletRequest; import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegment; import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegmentJsonReader; import org.skywalking.apm.collector.agent.stream.parser.SegmentParse; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; import org.skywalking.apm.collector.server.jetty.JettyHandler; import org.slf4j.Logger; @@ -38,6 +39,12 @@ public class TraceSegmentServletHandler extends JettyHandler { private final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class); + private final ModuleManager moduleManager; + + public TraceSegmentServletHandler(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + } + @Override public String pathSpec() { return "/segments"; } @@ -64,7 +71,7 @@ public class TraceSegmentServletHandler extends JettyHandler { reader.beginArray(); while (reader.hasNext()) { - SegmentParse segmentParse = new SegmentParse(null); + SegmentParse segmentParse = new SegmentParse(moduleManager); TraceSegment traceSegment = jsonReader.read(reader); segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent); } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java index 938d0e98038d86bad5a80e7d677c19333155e787..2e2a3d86c342aa04da678a450e7d58148b123209 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.buffer; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.StringUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils; @@ -39,7 +40,7 @@ public enum SegmentBufferManager { public static final String DATA_FILE_PREFIX = "data"; private FileOutputStream outputStream; - public synchronized void initialize() { + public synchronized void initialize(ModuleManager moduleManager) { logger.info("segment buffer initialize"); try { OffsetManager.INSTANCE.initialize(); @@ -58,7 +59,7 @@ public enum SegmentBufferManager { newDataFile(); } } - SegmentBufferReader.INSTANCE.initialize(); + SegmentBufferReader.INSTANCE.initialize(moduleManager); } catch (IOException e) { logger.error(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java index a073b212b739776e219103aef1812db5a3a57258..148b33d9b06a6a3332d10f803731b5b42e2ebbf2 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.skywalking.apm.collector.agent.stream.parser.SegmentParse; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.CollectionUtils; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.StringUtils; @@ -42,8 +43,10 @@ public enum SegmentBufferReader { private final Logger logger = LoggerFactory.getLogger(SegmentBufferReader.class); private InputStream inputStream; + private ModuleManager moduleManager; - public void initialize() { + public void initialize(ModuleManager moduleManager) { + this.moduleManager = moduleManager; Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS); } @@ -117,7 +120,7 @@ public enum SegmentBufferReader { while (readFile.length() > readFileOffset && readFileOffset < endPoint) { UpstreamSegment upstreamSegment = UpstreamSegment.parser().parseDelimitedFrom(inputStream); - SegmentParse parse = new SegmentParse(null); + SegmentParse parse = new SegmentParse(moduleManager); if (!parse.parse(upstreamSegment, SegmentParse.Source.Buffer)) { return false; } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java index 8fe297ad253d5dd1b3233acf49d930ed7b6546ff..0383d9327351f3cc82d86dec932edb0f71a8df14 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java @@ -38,7 +38,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker