提交 6f1f4cf1 编写于 作者: wu-sheng's avatar wu-sheng

Change network protocol for 5.0

上级 07ad767b
......@@ -25,9 +25,9 @@ import org.apache.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.apache.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMappings;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.Applications;
import org.apache.skywalking.apm.network.proto.KeyWithIntegerValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,18 +45,18 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
applicationIDService = moduleManager.find(AgentStreamModule.NAME).getService(IApplicationIDService.class);
}
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
@Override public void batchRegister(Applications request, StreamObserver<ApplicationMappings> responseObserver) {
logger.debug("register application");
ProtocolStringList applicationCodes = request.getApplicationCodeList();
ProtocolStringList applicationCodes = request.getApplicationCodesList();
ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
ApplicationMappings.Builder builder = ApplicationMappings.newBuilder();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = applicationIDService.getOrCreate(applicationCode);
if (applicationId != 0) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
builder.addApplication(value);
builder.addApplications(value);
}
}
responseObserver.onNext(builder.build());
......
......@@ -21,9 +21,9 @@ package org.apache.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMappings;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.Applications;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -40,8 +40,8 @@ public class ApplicationRegisterServiceHandlerTestCase {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Application application = Application.newBuilder().addApplicationCode("test141").build();
ApplicationMapping mapping = stub.register(application);
logger.debug(mapping.getApplication(0).getKey() + ", " + mapping.getApplication(0).getValue());
Applications application = Applications.newBuilder().addApplicationCodes("test141").build();
ApplicationMappings mapping = stub.batchRegister(application);
logger.debug(mapping.getApplications(0).getKey() + ", " + mapping.getApplications(0).getValue());
}
}
......@@ -7,14 +7,14 @@ import "KeyWithIntegerValue.proto";
//register service for ApplicationCode, this service is called when service starts.
service ApplicationRegisterService {
rpc register (Application) returns (ApplicationMapping) {
rpc batchRegister (Applications) returns (ApplicationMappings) {
}
}
message Application {
repeated string applicationCode = 1;
message Applications {
repeated string applicationCodes = 1;
}
message ApplicationMapping {
repeated KeyWithIntegerValue application = 1;
message ApplicationMappings {
repeated KeyWithIntegerValue applications = 1;
}
......@@ -78,6 +78,7 @@ enum SpanLayer {
RPCFramework = 2;
Http = 3;
MQ = 4;
Cache = 5;
}
message LogMessage {
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.agent.core.context.trace;
/**
......@@ -26,7 +25,8 @@ public enum SpanLayer {
DB(1),
RPC_FRAMEWORK(2),
HTTP(3),
MQ(4);
MQ(4),
CACHE(5);
private int code;
......@@ -42,6 +42,10 @@ public enum SpanLayer {
span.setLayer(SpanLayer.DB);
}
public static void asCache(AbstractSpan span) {
span.setLayer(SpanLayer.CACHE);
}
public static void asRPCFramework(AbstractSpan span) {
span.setLayer(SpanLayer.RPC_FRAMEWORK);
}
......
......@@ -23,9 +23,9 @@ import io.netty.util.internal.ConcurrentSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMappings;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.Applications;
import org.apache.skywalking.apm.network.proto.KeyWithIntegerValue;
import static org.apache.skywalking.apm.agent.core.conf.Config.Dictionary.APPLICATION_CODE_BUFFER_SIZE;
......@@ -55,10 +55,10 @@ public enum ApplicationDictionary {
public void syncRemoteDictionary(
ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub) {
if (unRegisterApplications.size() > 0) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.register(
Application.newBuilder().addAllApplicationCode(unRegisterApplications).build());
if (applicationMapping.getApplicationCount() > 0) {
for (KeyWithIntegerValue keyWithIntegerValue : applicationMapping.getApplicationList()) {
ApplicationMappings applicationMapping = applicationRegisterServiceBlockingStub.batchRegister(
Applications.newBuilder().addAllApplicationCodes(unRegisterApplications).build());
if (applicationMapping.getApplicationsCount() > 0) {
for (KeyWithIntegerValue keyWithIntegerValue : applicationMapping.getApplicationsList()) {
unRegisterApplications.remove(keyWithIntegerValue.getKey());
applicationDictionary.put(keyWithIntegerValue.getKey(), keyWithIntegerValue.getValue());
}
......
......@@ -38,13 +38,13 @@ import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationInstance;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMappings;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.Applications;
import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
......@@ -112,10 +112,10 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
try {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
if (applicationRegisterServiceBlockingStub != null) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.register(
Application.newBuilder().addApplicationCode(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping.getApplicationCount() > 0) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication(0).getValue();
ApplicationMappings applicationMapping = applicationRegisterServiceBlockingStub.batchRegister(
Applications.newBuilder().addApplicationCodes(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping.getApplicationsCount() > 0) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplications(0).getValue();
shouldTry = true;
}
}
......
......@@ -37,7 +37,7 @@ public class JedisMethodInterceptor implements InstanceMethodsAroundInterceptor
AbstractSpan span = ContextManager.createExitSpan("Jedis/" + method.getName(), peer);
span.setComponent(ComponentsDefine.REDIS);
Tags.DB_TYPE.set(span, "Redis");
SpanLayer.asDB(span);
SpanLayer.asCache(span);
if (allArguments.length > 0 && allArguments[0] instanceof String) {
Tags.DB_STATEMENT.set(span, method.getName() + " " + allArguments[0]);
......
......@@ -132,7 +132,7 @@ public class JedisMethodInterceptorTest {
List<KeyValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("Redis"));
assertThat(tags.get(1).getValue(), is("set OperationKey"));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.CACHE));
}
private Method getMockSetMethod() {
......
......@@ -53,7 +53,7 @@ public abstract class AbstractMessageConsumeInterceptor implements InstanceMetho
AbstractSpan span = ContextManager.createEntrySpan(COMSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier);
span.setComponent(ComponentsDefine.ROCKET_MQ);
span.setLayer(SpanLayer.MQ);
SpanLayer.asMQ(span);
for (int i = 1; i < msgs.size(); i++) {
ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
}
......
......@@ -60,7 +60,7 @@ public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor
String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField());
AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
span.setComponent(ComponentsDefine.ROCKET_MQ);
span.setLayer(SpanLayer.MQ);
SpanLayer.asMQ(span);
span.tag("brokerName", (String)allArguments[1]);
span.tag("tags", message.getTags());
span.tag("communication.mode", ((CommunicationMode)allArguments[5]).name());
......
......@@ -40,7 +40,7 @@ public class MemcachedMethodInterceptor implements InstanceMethodsAroundIntercep
AbstractSpan span = ContextManager.createExitSpan(SPY_MEMCACHE + method.getName(), peer);
span.setComponent(ComponentsDefine.MEMCACHED);
Tags.DB_TYPE.set(span, ComponentsDefine.MEMCACHED.getName());
SpanLayer.asDB(span);
SpanLayer.asCache(span);
Tags.DB_STATEMENT.set(span, method.getName() + " " + allArguments[0]);
}
......
......@@ -116,7 +116,7 @@ public class MemcachedMethodInterceptorTest {
List<KeyValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("Memcached"));
assertThat(tags.get(1).getValue(), is("set OperationKey"));
MatcherAssert.assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
MatcherAssert.assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.CACHE));
}
private Method getMockSetMethod() {
......
......@@ -46,7 +46,7 @@ public class XMemcachedMethodInterceptor implements InstanceMethodsAroundInterce
AbstractSpan span = ContextManager.createExitSpan(XMEMCACHED + method.getName(), peer);
span.setComponent(ComponentsDefine.MEMCACHED);
Tags.DB_TYPE.set(span, ComponentsDefine.MEMCACHED.getName());
SpanLayer.asDB(span);
SpanLayer.asCache(span);
Tags.DB_STATEMENT.set(span, method.getName() + " " + allArguments[0]);
}
......
......@@ -119,7 +119,7 @@ public class XMemcachedMethodInterceptorTest {
List<KeyValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("Memcached"));
assertThat(tags.get(1).getValue(), is("set OperationKey"));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.CACHE));
}
private Method getMockSetMethod() throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册