未验证 提交 8658ca77 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #592 from OpenSkywalking/feature/stream-jetty

Feature/stream jetty
......@@ -22,8 +22,6 @@ import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
......@@ -52,12 +50,7 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = 0;
try {
applicationId = applicationIDService.getOrCreate(applicationCode);
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
int applicationId = applicationIDService.getOrCreate(applicationCode);
if (applicationId != 0) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
......
......@@ -23,8 +23,6 @@ import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ApplicationInstance;
......@@ -52,12 +50,7 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
int instanceId = 0;
try {
instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
......@@ -68,11 +61,7 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
try {
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
......
......@@ -20,6 +20,9 @@ package org.skywalking.apm.collector.agent.jetty;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.handler.ApplicationRegisterServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.InstanceDiscoveryServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.jetty.handler.TraceSegmentServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener;
......@@ -89,6 +92,9 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
private void addHandlers(Server jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler());
jettyServer.addHandler(new TraceSegmentServletHandler(getManager()));
jettyServer.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyServer.addHandler(new InstanceDiscoveryServletHandler(getManager()));
jettyServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class);
private final ApplicationIDService applicationIDService;
private Gson gson = new Gson();
private static final String APPLICATION_CODE = "c";
private static final String APPLICATION_ID = "i";
public ApplicationRegisterServletHandler(ModuleManager moduleManager) {
this.applicationIDService = new ApplicationIDService(moduleManager);
}
@Override public String pathSpec() {
return "/application/register";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonArray responseArray = new JsonArray();
try {
JsonArray applicationCodes = gson.fromJson(req.getReader(), JsonArray.class);
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i).getAsString();
int applicationId = applicationIDService.getOrCreate(applicationCode);
JsonObject mapping = new JsonObject();
mapping.addProperty(APPLICATION_CODE, applicationCode);
mapping.addProperty(APPLICATION_ID, applicationId);
responseArray.add(mapping);
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseArray;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceDiscoveryServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class);
private final InstanceIDService instanceIDService;
private Gson gson = new Gson();
private static final String APPLICATION_ID = "ai";
private static final String AGENT_UUID = "au";
private static final String REGISTER_TIME = "rt";
private static final String INSTANCE_ID = "ii";
private static final String OS_INFO = "oi";
public InstanceDiscoveryServletHandler(ModuleManager moduleManager) {
this.instanceIDService = new InstanceIDService(moduleManager);
}
@Override public String pathSpec() {
return "/instance/register";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonObject responseJson = new JsonObject();
try {
JsonObject instance = gson.fromJson(req.getReader(), JsonObject.class);
int applicationId = instance.get(APPLICATION_ID).getAsInt();
String agentUUID = instance.get(AGENT_UUID).getAsString();
long registerTime = instance.get(REGISTER_TIME).getAsLong();
JsonObject osInfo = instance.get(OS_INFO).getAsJsonObject();
int instanceId = instanceIDService.getOrCreate(applicationId, agentUUID, registerTime, osInfo.toString());
responseJson.addProperty(APPLICATION_ID, applicationId);
responseJson.addProperty(INSTANCE_ID, instanceId);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseJson;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private final ServiceNameService serviceNameService;
private Gson gson = new Gson();
private static final String APPLICATION_ID = "ai";
private static final String SERVICE_NAME = "sn";
private static final String SERVICE_ID = "si";
private static final String ELEMENT = "el";
public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceNameService = new ServiceNameService(moduleManager);
}
@Override public String pathSpec() {
return "/servicename/discovery";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonArray responseArray = new JsonArray();
try {
JsonArray services = gson.fromJson(req.getReader(), JsonArray.class);
for (JsonElement service : services) {
int applicationId = service.getAsJsonObject().get(APPLICATION_ID).getAsInt();
String serviceName = service.getAsJsonObject().get(SERVICE_NAME).getAsString();
int serviceId = serviceNameService.getOrCreate(applicationId, serviceName);
if (serviceId != 0) {
JsonObject responseJson = new JsonObject();
responseJson.addProperty(SERVICE_ID, serviceId);
responseJson.add(ELEMENT, service);
responseArray.add(responseJson);
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseArray;
}
}
......@@ -26,6 +26,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegment;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegmentJsonReader;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
......@@ -38,6 +39,12 @@ public class TraceSegmentServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
private final ModuleManager moduleManager;
public TraceSegmentServletHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override public String pathSpec() {
return "/segments";
}
......@@ -64,7 +71,7 @@ public class TraceSegmentServletHandler extends JettyHandler {
reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse(null);
SegmentParse segmentParse = new SegmentParse(moduleManager);
TraceSegment traceSegment = jsonReader.read(reader);
segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterPost {
public void send(String jsonFile) throws IOException {
JsonElement application = JsonFileReader.INSTANCE.read(jsonFile);
HttpClientTools.INSTANCE.post("http://localhost:12800/application/register", application.toString());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public enum HttpClientTools {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(HttpClientTools.class);
public String get(String url, List<NameValuePair> params) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpGet httpget = new HttpGet(url);
String paramStr = EntityUtils.toString(new UrlEncodedFormEntity(params));
httpget.setURI(new URI(httpget.getURI().toString() + "?" + paramStr));
logger.debug("executing get request {}", httpget.getURI());
try (CloseableHttpResponse response = httpClient.execute(httpget)) {
HttpEntity entity = response.getEntity();
if (entity != null) {
return EntityUtils.toString(entity);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
httpClient.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
public String post(String url, String data) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpPost httppost = new HttpPost(url);
httppost.setEntity(new StringEntity(data, Consts.UTF_8));
logger.debug("executing post request {}", httppost.getURI());
try (CloseableHttpResponse response = httpClient.execute(httppost)) {
HttpEntity entity = response.getEntity();
if (entity != null) {
return EntityUtils.toString(entity);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
httpClient.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class InstanceRegisterPost {
public void send(String jsonFile) throws IOException {
JsonElement instance = JsonFileReader.INSTANCE.read(jsonFile);
HttpClientTools.INSTANCE.post("http://localhost:12800/instance/register", instance.toString());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.FileNotFoundException;
import java.io.FileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public enum JsonFileReader {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(JsonFileReader.class);
public JsonElement read(String fileName) throws FileNotFoundException {
String path = this.getClass().getClassLoader().getResource(fileName).getFile();
logger.debug("path: {}", path);
JsonParser jsonParser = new JsonParser();
return jsonParser.parse(new FileReader(path));
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class SegmentPost {
public static void main(String[] args) throws IOException {
ApplicationRegisterPost applicationRegisterPost = new ApplicationRegisterPost();
applicationRegisterPost.send("json/application-register-consumer.json");
applicationRegisterPost.send("json/application-register-provider.json");
InstanceRegisterPost instanceRegisterPost = new InstanceRegisterPost();
instanceRegisterPost.send("json/instance-register-consumer.json");
instanceRegisterPost.send("json/instance-register-provider.json");
ServiceNameRegisterPost serviceNameRegisterPost = new ServiceNameRegisterPost();
serviceNameRegisterPost.send("json/servicename-register-consumer.json");
serviceNameRegisterPost.send("json/servicename-register-provider.json");
JsonElement provider = JsonFileReader.INSTANCE.read("json/dubbox-provider.json");
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
JsonElement consumer = JsonFileReader.INSTANCE.read("json/dubbox-consumer.json");
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class ServiceNameRegisterPost {
public void send(String jsonFile) throws IOException {
JsonElement instance = JsonFileReader.INSTANCE.read(jsonFile);
HttpClientTools.INSTANCE.post("http://localhost:12800/servicename/discovery", instance.toString());
}
}
[
{
"gt": [
[
230150,
185809,
24040000
]
],
"sg": {
"ts": [
230150,
185809,
24040000
],
"ai": -1,
"ii": 1,
"rs": [],
"ss": [
{
"si": 1,
"tv": 1,
"lv": 1,
"ps": 0,
"st": 1501858094526,
"et": 1501858097004,
"ci": 3,
"cn": "",
"oi": 0,
"on": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()",
"pi": 0,
"pn": "172.25.0.4:20880",
"ie": false,
"to": [
{
"k": "url",
"v": "rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
}
],
"lo": []
},
{
"si": 0,
"tv": 0,
"lv": 2,
"ps": -1,
"st": 1501858092409,
"et": 1501858097033,
"ci": 1,
"cn": "",
"oi": 0,
"on": "/dubbox-case/case/dubbox-rest",
"pi": 0,
"pn": "",
"ie": false,
"to": [
{
"k": "url",
"v": "http://localhost:18080/dubbox-case/case/dubbox-rest"
},
{
"k": "http.method",
"v": "GET"
}
],
"lo": []
}
]
}
}
]
\ No newline at end of file
[
{
"gt": [
[
230150,
185809,
24040000
]
],
"sg": {
"ts": [
137150,
185809,
48780000
],
"ai": 2,
"ii": 2,
"rs": [
{
"ts": [
230150,
185809,
24040000
],
"ai": -1,
"si": 1,
"vi": 0,
"vn": "/dubbox-case/case/dubbox-rest",
"ni": 0,
"nn": "172.25.0.4:20880",
"ea": 2,
"ei": 0,
"en": "/dubbox-case/case/dubbox-rest",
"rn": 0
}
],
"ss": [
{
"si": 0,
"tv": 0,
"lv": 2,
"ps": -1,
"st": 1501858094726,
"et": 1501858096804,
"ci": 3,
"cn": "",
"oi": 0,
"on": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()",
"pi": 0,
"pn": "",
"ie": false,
"to": [
{
"k": "url",
"v": "rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
},
{
"k": "http.method",
"v": "GET"
}
],
"lo": []
}
]
}
}
]
\ No newline at end of file
{
"ai": -1,
"au": "dubbox-consumer",
"rt": 1501858094526,
"oi": {
"any_name": "any_value",
"any_name1": "any_value1"
}
}
\ No newline at end of file
{
"ai": 2,
"au": "dubbox-provider",
"rt": 1501858094526,
"oi": {
"any_name": "any_value",
"any_name1": "any_value1"
}
}
\ No newline at end of file
[
{
"ai": -1,
"sn": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
}
]
\ No newline at end of file
[
{
"ai": 2,
"sn": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
}
]
\ No newline at end of file
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.buffer;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -39,7 +40,7 @@ public enum SegmentBufferManager {
public static final String DATA_FILE_PREFIX = "data";
private FileOutputStream outputStream;
public synchronized void initialize() {
public synchronized void initialize(ModuleManager moduleManager) {
logger.info("segment buffer initialize");
try {
OffsetManager.INSTANCE.initialize();
......@@ -58,7 +59,7 @@ public enum SegmentBufferManager {
newDataFile();
}
}
SegmentBufferReader.INSTANCE.initialize();
SegmentBufferReader.INSTANCE.initialize(moduleManager);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -27,6 +27,7 @@ import java.io.InputStream;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
......@@ -42,8 +43,10 @@ public enum SegmentBufferReader {
private final Logger logger = LoggerFactory.getLogger(SegmentBufferReader.class);
private InputStream inputStream;
private ModuleManager moduleManager;
public void initialize() {
public void initialize(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS);
}
......@@ -117,7 +120,7 @@ public enum SegmentBufferReader {
while (readFile.length() > readFileOffset && readFileOffset < endPoint) {
UpstreamSegment upstreamSegment = UpstreamSegment.parser().parseDelimitedFrom(inputStream);
SegmentParse parse = new SegmentParse(null);
SegmentParse parse = new SegmentParse(moduleManager);
if (!parse.parse(upstreamSegment, SegmentParse.Source.Buffer)) {
return false;
}
......
......@@ -18,9 +18,9 @@
package org.skywalking.apm.collector.agent.stream.parser.standardization;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
......@@ -36,9 +36,9 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
private final Logger logger = LoggerFactory.getLogger(ReferenceIdExchanger.class);
private static ReferenceIdExchanger EXCHANGER;
private ServiceNameService serviceNameService;
private final ServiceNameService serviceNameService;
private final ApplicationIDService applicationIDService;
private final InstanceCacheService instanceCacheService;
private final ApplicationCacheService applicationCacheService;
public static ReferenceIdExchanger getInstance(ModuleManager moduleManager) {
if (EXCHANGER == null) {
......@@ -48,9 +48,9 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
private ReferenceIdExchanger(ModuleManager moduleManager) {
applicationIDService = new ApplicationIDService(moduleManager);
serviceNameService = new ServiceNameService(moduleManager);
instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
@Override public boolean exchange(ReferenceDecorator standardBuilder, int applicationId) {
......@@ -58,6 +58,10 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
int entryServiceId = serviceNameService.getOrCreate(instanceCacheService.get(standardBuilder.getEntryApplicationInstanceId()), standardBuilder.getEntryServiceName());
if (entryServiceId == 0) {
if (logger.isDebugEnabled()) {
int entryApplicationId = instanceCacheService.get(standardBuilder.getEntryApplicationInstanceId());
logger.debug("entry service name: {} from application id: {} exchange failed", standardBuilder.getEntryServiceName(), entryApplicationId);
}
return false;
} else {
standardBuilder.toBuilder();
......@@ -70,6 +74,10 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
int parentServiceId = serviceNameService.getOrCreate(instanceCacheService.get(standardBuilder.getParentApplicationInstanceId()), standardBuilder.getParentServiceName());
if (parentServiceId == 0) {
if (logger.isDebugEnabled()) {
int parentApplicationId = instanceCacheService.get(standardBuilder.getParentApplicationInstanceId());
logger.debug("parent service name: {} from application id: {} exchange failed", standardBuilder.getParentServiceName(), parentApplicationId);
}
return false;
} else {
standardBuilder.toBuilder();
......@@ -79,8 +87,11 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
if (standardBuilder.getNetworkAddressId() == 0 && StringUtils.isNotEmpty(standardBuilder.getNetworkAddress())) {
int networkAddressId = applicationCacheService.get(standardBuilder.getNetworkAddress());
int networkAddressId = applicationIDService.getOrCreate(standardBuilder.getNetworkAddress());
if (networkAddressId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("network address: {} from application id: {} exchange failed", standardBuilder.getNetworkAddress(), applicationId);
}
return false;
} else {
standardBuilder.toBuilder();
......
......@@ -38,7 +38,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Segme
public SegmentStandardizationWorker(ModuleManager moduleManager) {
super(moduleManager);
SegmentBufferManager.INSTANCE.initialize();
SegmentBufferManager.INSTANCE.initialize(moduleManager);
}
@Override public int id() {
......
......@@ -18,9 +18,8 @@
package org.skywalking.apm.collector.agent.stream.parser.standardization;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
......@@ -35,8 +34,8 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
private final Logger logger = LoggerFactory.getLogger(SpanIdExchanger.class);
private static SpanIdExchanger EXCHANGER;
private final ApplicationIDService applicationIDService;
private final ServiceNameService serviceNameService;
private final ApplicationCacheService applicationCacheService;
public static SpanIdExchanger getInstance(ModuleManager moduleManager) {
if (EXCHANGER == null) {
......@@ -45,15 +44,16 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
return EXCHANGER;
}
public SpanIdExchanger(ModuleManager moduleManager) {
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
private SpanIdExchanger(ModuleManager moduleManager) {
this.applicationIDService = new ApplicationIDService(moduleManager);
this.serviceNameService = new ServiceNameService(moduleManager);
}
@Override public boolean exchange(SpanDecorator standardBuilder, int applicationId) {
if (standardBuilder.getPeerId() == 0 && StringUtils.isNotEmpty(standardBuilder.getPeer())) {
int peerId = applicationCacheService.get(standardBuilder.getPeer());
int peerId = applicationIDService.getOrCreate(standardBuilder.getPeer());
if (peerId == 0) {
logger.debug("peer: {} in application: {} exchange failed", standardBuilder.getPeer(), applicationId);
return false;
} else {
standardBuilder.toBuilder();
......@@ -66,6 +66,7 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
int operationNameId = serviceNameService.getOrCreate(applicationId, standardBuilder.getOperationName());
if (operationNameId == 0) {
logger.debug("service name: {} from application id: {} exchange failed", standardBuilder.getOperationName(), applicationId);
return false;
} else {
standardBuilder.toBuilder();
......
......@@ -24,8 +24,6 @@ import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,7 +43,7 @@ public class ApplicationIDService {
this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class);
}
public int getOrCreate(String applicationCode) throws ModuleNotFoundException, ServiceNotProvidedException {
public int getOrCreate(String applicationCode) {
ApplicationCacheService service = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
int applicationId = service.get(applicationCode);
......
......@@ -24,8 +24,6 @@ import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.table.register.Instance;
......@@ -49,8 +47,7 @@ public class InstanceIDService {
this.instanceRegisterDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceRegisterDAO.class);
}
public int getOrCreate(int applicationId, String agentUUID, long registerTime,
String osInfo) throws ModuleNotFoundException, ServiceNotProvidedException {
public int getOrCreate(int applicationId, String agentUUID, long registerTime, String osInfo) {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
InstanceCacheService service = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
int instanceId = service.getInstanceId(applicationId, agentUUID);
......@@ -69,8 +66,7 @@ public class InstanceIDService {
return instanceId;
}
public void recover(int instanceId, int applicationId, long registerTime,
String osInfo) throws ModuleNotFoundException, ServiceNotProvidedException {
public void recover(int instanceId, int applicationId, long registerTime, String osInfo) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
Instance instance = new Instance(String.valueOf(instanceId));
......
......@@ -23,10 +23,10 @@ ui:
host: localhost
port: 12800
context_path: /
#storage:
# elasticsearch:
# cluster_name: CollectorDBCluster
# cluster_transport_sniffer: true
# cluster_nodes: localhost:9300
# index_shards_number: 2
# index_replicas_number: 0
\ No newline at end of file
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
\ No newline at end of file
......@@ -26,6 +26,8 @@ import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.cluster.standalone.service.StandaloneModuleListenerService;
import org.skywalking.apm.collector.cluster.standalone.service.StandaloneModuleRegisterService;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
......@@ -75,7 +77,11 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
try {
dataMonitor.start();
} catch (CollectorException e) {
throw new UnexpectedException(e.getMessage());
}
}
@Override public String[] requiredModules() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册