提交 0c0a480a 编写于 作者: P peng-yongsheng

Application, instance, service name register by http json test success.

Trace segment compute by http json stream test success.
上级 8c3fbe4c
......@@ -23,8 +23,6 @@ import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ApplicationInstance;
......@@ -52,12 +50,7 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
int instanceId = 0;
try {
instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
......@@ -68,11 +61,7 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
try {
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterPost {
public void send(String jsonFile) throws IOException {
JsonElement application = JsonFileReader.INSTANCE.read(jsonFile);
HttpClientTools.INSTANCE.post("http://localhost:12800/application/register", application.toString());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public enum HttpClientTools {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(HttpClientTools.class);
public String get(String url, List<NameValuePair> params) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpGet httpget = new HttpGet(url);
String paramStr = EntityUtils.toString(new UrlEncodedFormEntity(params));
httpget.setURI(new URI(httpget.getURI().toString() + "?" + paramStr));
logger.debug("executing get request {}", httpget.getURI());
try (CloseableHttpResponse response = httpClient.execute(httpget)) {
HttpEntity entity = response.getEntity();
if (entity != null) {
return EntityUtils.toString(entity);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
httpClient.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
public String post(String url, String data) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpPost httppost = new HttpPost(url);
httppost.setEntity(new StringEntity(data, Consts.UTF_8));
logger.debug("executing post request {}", httppost.getURI());
try (CloseableHttpResponse response = httpClient.execute(httppost)) {
HttpEntity entity = response.getEntity();
if (entity != null) {
return EntityUtils.toString(entity);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
httpClient.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class InstanceRegisterPost {
public void send(String jsonFile) throws IOException {
JsonElement instance = JsonFileReader.INSTANCE.read(jsonFile);
HttpClientTools.INSTANCE.post("http://localhost:12800/instance/register", instance.toString());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.FileNotFoundException;
import java.io.FileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public enum JsonFileReader {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(JsonFileReader.class);
public JsonElement read(String fileName) throws FileNotFoundException {
String path = this.getClass().getClassLoader().getResource(fileName).getFile();
logger.debug("path: {}", path);
JsonParser jsonParser = new JsonParser();
return jsonParser.parse(new FileReader(path));
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class SegmentPost {
public static void main(String[] args) throws IOException {
ApplicationRegisterPost applicationRegisterPost = new ApplicationRegisterPost();
applicationRegisterPost.send("json/application-register-consumer.json");
applicationRegisterPost.send("json/application-register-provider.json");
InstanceRegisterPost instanceRegisterPost = new InstanceRegisterPost();
instanceRegisterPost.send("json/instance-register-consumer.json");
instanceRegisterPost.send("json/instance-register-provider.json");
ServiceNameRegisterPost serviceNameRegisterPost = new ServiceNameRegisterPost();
serviceNameRegisterPost.send("json/servicename-register-consumer.json");
serviceNameRegisterPost.send("json/servicename-register-provider.json");
JsonElement provider = JsonFileReader.INSTANCE.read("json/dubbox-provider.json");
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
JsonElement consumer = JsonFileReader.INSTANCE.read("json/dubbox-consumer.json");
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class ServiceNameRegisterPost {
public void send(String jsonFile) throws IOException {
JsonElement instance = JsonFileReader.INSTANCE.read(jsonFile);
HttpClientTools.INSTANCE.post("http://localhost:12800/servicename/discovery", instance.toString());
}
}
[
{
"gt": [
[
230150,
185809,
24040000
]
],
"sg": {
"ts": [
230150,
185809,
24040000
],
"ai": -1,
"ii": 1,
"rs": [],
"ss": [
{
"si": 1,
"tv": 1,
"lv": 1,
"ps": 0,
"st": 1501858094526,
"et": 1501858097004,
"ci": 3,
"cn": "",
"oi": 0,
"on": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()",
"pi": 0,
"pn": "172.25.0.4:20880",
"ie": false,
"to": [
{
"k": "url",
"v": "rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
}
],
"lo": []
},
{
"si": 0,
"tv": 0,
"lv": 2,
"ps": -1,
"st": 1501858092409,
"et": 1501858097033,
"ci": 1,
"cn": "",
"oi": 0,
"on": "/dubbox-case/case/dubbox-rest",
"pi": 0,
"pn": "",
"ie": false,
"to": [
{
"k": "url",
"v": "http://localhost:18080/dubbox-case/case/dubbox-rest"
},
{
"k": "http.method",
"v": "GET"
}
],
"lo": []
}
]
}
}
]
\ No newline at end of file
[
{
"gt": [
[
230150,
185809,
24040000
]
],
"sg": {
"ts": [
137150,
185809,
48780000
],
"ai": 2,
"ii": 2,
"rs": [
{
"ts": [
230150,
185809,
24040000
],
"ai": -1,
"si": 1,
"vi": 0,
"vn": "/dubbox-case/case/dubbox-rest",
"ni": 0,
"nn": "172.25.0.4:20880",
"ea": 2,
"ei": 0,
"en": "/dubbox-case/case/dubbox-rest",
"rn": 0
}
],
"ss": [
{
"si": 0,
"tv": 0,
"lv": 2,
"ps": -1,
"st": 1501858094726,
"et": 1501858096804,
"ci": 3,
"cn": "",
"oi": 0,
"on": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()",
"pi": 0,
"pn": "",
"ie": false,
"to": [
{
"k": "url",
"v": "rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
},
{
"k": "http.method",
"v": "GET"
}
],
"lo": []
}
]
}
}
]
\ No newline at end of file
{
"ai": -1,
"au": "dubbox-consumer",
"rt": 1501858094526,
"oi": {
"any_name": "any_value",
"any_name1": "any_value1"
}
}
\ No newline at end of file
{
"ai": 2,
"au": "dubbox-provider",
"rt": 1501858094526,
"oi": {
"any_name": "any_value",
"any_name1": "any_value1"
}
}
\ No newline at end of file
[
{
"ai": -1,
"sn": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
}
]
\ No newline at end of file
[
{
"ai": 2,
"sn": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
}
]
\ No newline at end of file
......@@ -18,9 +18,9 @@
package org.skywalking.apm.collector.agent.stream.parser.standardization;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
......@@ -36,9 +36,9 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
private final Logger logger = LoggerFactory.getLogger(ReferenceIdExchanger.class);
private static ReferenceIdExchanger EXCHANGER;
private ServiceNameService serviceNameService;
private final ServiceNameService serviceNameService;
private final ApplicationIDService applicationIDService;
private final InstanceCacheService instanceCacheService;
private final ApplicationCacheService applicationCacheService;
public static ReferenceIdExchanger getInstance(ModuleManager moduleManager) {
if (EXCHANGER == null) {
......@@ -48,9 +48,9 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
private ReferenceIdExchanger(ModuleManager moduleManager) {
applicationIDService = new ApplicationIDService(moduleManager);
serviceNameService = new ServiceNameService(moduleManager);
instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
@Override public boolean exchange(ReferenceDecorator standardBuilder, int applicationId) {
......@@ -58,6 +58,10 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
int entryServiceId = serviceNameService.getOrCreate(instanceCacheService.get(standardBuilder.getEntryApplicationInstanceId()), standardBuilder.getEntryServiceName());
if (entryServiceId == 0) {
if (logger.isDebugEnabled()) {
int entryApplicationId = instanceCacheService.get(standardBuilder.getEntryApplicationInstanceId());
logger.debug("entry service name: {} from application id: {} exchange failed", standardBuilder.getEntryServiceName(), entryApplicationId);
}
return false;
} else {
standardBuilder.toBuilder();
......@@ -70,6 +74,10 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
int parentServiceId = serviceNameService.getOrCreate(instanceCacheService.get(standardBuilder.getParentApplicationInstanceId()), standardBuilder.getParentServiceName());
if (parentServiceId == 0) {
if (logger.isDebugEnabled()) {
int parentApplicationId = instanceCacheService.get(standardBuilder.getParentApplicationInstanceId());
logger.debug("parent service name: {} from application id: {} exchange failed", standardBuilder.getParentServiceName(), parentApplicationId);
}
return false;
} else {
standardBuilder.toBuilder();
......@@ -79,8 +87,11 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
if (standardBuilder.getNetworkAddressId() == 0 && StringUtils.isNotEmpty(standardBuilder.getNetworkAddress())) {
int networkAddressId = applicationCacheService.get(standardBuilder.getNetworkAddress());
int networkAddressId = applicationIDService.getOrCreate(standardBuilder.getNetworkAddress());
if (networkAddressId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("network address: {} from application id: {} exchange failed", standardBuilder.getNetworkAddress(), applicationId);
}
return false;
} else {
standardBuilder.toBuilder();
......
......@@ -18,9 +18,8 @@
package org.skywalking.apm.collector.agent.stream.parser.standardization;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
......@@ -35,8 +34,8 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
private final Logger logger = LoggerFactory.getLogger(SpanIdExchanger.class);
private static SpanIdExchanger EXCHANGER;
private final ApplicationIDService applicationIDService;
private final ServiceNameService serviceNameService;
private final ApplicationCacheService applicationCacheService;
public static SpanIdExchanger getInstance(ModuleManager moduleManager) {
if (EXCHANGER == null) {
......@@ -45,15 +44,16 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
return EXCHANGER;
}
public SpanIdExchanger(ModuleManager moduleManager) {
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
private SpanIdExchanger(ModuleManager moduleManager) {
this.applicationIDService = new ApplicationIDService(moduleManager);
this.serviceNameService = new ServiceNameService(moduleManager);
}
@Override public boolean exchange(SpanDecorator standardBuilder, int applicationId) {
if (standardBuilder.getPeerId() == 0 && StringUtils.isNotEmpty(standardBuilder.getPeer())) {
int peerId = applicationCacheService.get(standardBuilder.getPeer());
int peerId = applicationIDService.getOrCreate(standardBuilder.getPeer());
if (peerId == 0) {
logger.debug("peer: {} in application: {} exchange failed", standardBuilder.getPeer(), applicationId);
return false;
} else {
standardBuilder.toBuilder();
......@@ -66,6 +66,7 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
int operationNameId = serviceNameService.getOrCreate(applicationId, standardBuilder.getOperationName());
if (operationNameId == 0) {
logger.debug("service name: {} from application id: {} exchange failed", standardBuilder.getOperationName(), applicationId);
return false;
} else {
standardBuilder.toBuilder();
......
......@@ -23,10 +23,10 @@ ui:
host: localhost
port: 12800
context_path: /
#storage:
# elasticsearch:
# cluster_name: CollectorDBCluster
# cluster_transport_sniffer: true
# cluster_nodes: localhost:9300
# index_shards_number: 2
# index_replicas_number: 0
\ No newline at end of file
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
\ No newline at end of file
......@@ -26,6 +26,8 @@ import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.cluster.standalone.service.StandaloneModuleListenerService;
import org.skywalking.apm.collector.cluster.standalone.service.StandaloneModuleRegisterService;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
......@@ -75,7 +77,11 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
try {
dataMonitor.start();
} catch (CollectorException e) {
throw new UnexpectedException(e.getMessage());
}
}
@Override public String[] requiredModules() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册