提交 d322f528 编写于 作者: wu-sheng's avatar wu-sheng 提交者: Xin,Zhang

Support token auth in gRPC upstream (#1000)

* Add client side token.

* Fix ci.

* Fix most codes of token auth at both sides.

* Add settings in config files.

* Make auth interceptor doesn't throw expcetion.

* Update AuthenticationSimpleChecker.java

* Update AuthenticationActivator.java

* Revert "Update AuthenticationActivator.java"

This reverts commit 0935f18fd6802ab84c12e0609c21ef753ff51be3.

* Merge branch 'feature/token-auth' of https://github.com/apache/incubator-skywalking into feature/token-auth

# Conflicts:
#	apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/AuthenticationSimpleChecker.java

* Fix Auhentication token works incorrect (#1001)

* Fix Auhentication token works incorrect

* Change way to build GRPC Channel

* Response an empty header when auth fails.
上级 eff39b0f
......@@ -18,16 +18,8 @@
package org.apache.skywalking.apm.collector.agent.grpc.provider;
import java.io.File;
import java.util.Properties;
import org.apache.skywalking.apm.collector.agent.grpc.define.AgentGRPCModule;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ApplicationRegisterServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.InstanceDiscoveryServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.JVMMetricsServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.NetworkAddressRegisterServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ServiceNameDiscoveryServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.TraceSegmentServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.*;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingListener;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
......@@ -42,9 +34,12 @@ import org.apache.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.apache.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
import org.eclipse.jetty.util.StringUtil;
import java.io.File;
import java.util.Properties;
/**
* @author peng-yongsheng
*/
......@@ -55,6 +50,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
private static final String PORT = "port";
private static final String SSL_CERT_CHAIN_FILEPATH = "ssl_cert_chain_file";
private static final String SSL_PRIVATE_KEY_FILE = "ssl_private_key_file";
private static final String AUTHENTICATION = "authentication";
@Override
public String name() {
......@@ -77,6 +73,8 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
Integer port = (Integer) config.get(PORT);
String sslCertChainFilePath = config.getProperty(SSL_CERT_CHAIN_FILEPATH);
String sslPrivateKeyFilePath = config.getProperty(SSL_PRIVATE_KEY_FILE);
AuthenticationSimpleChecker.INSTANCE.setExpectedToken(config.getProperty(AUTHENTICATION, ""));
File sslCertChainFile = null;
File sslPrivateKeyFile = null;
if (StringUtil.isNotBlank(sslCertChainFilePath)) {
......@@ -103,7 +101,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
namingHandlerRegisterService.register(new AgentGRPCNamingHandler(namingListener));
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer;
GRPCServer gRPCServer;
if (sslCertChainFile != null && sslPrivateKeyFile != null) {
gRPCServer = managerService.createIfAbsent(host, port, sslCertChainFile, sslPrivateKeyFile);
} else {
......@@ -123,12 +121,12 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
return new String[]{ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AnalysisSegmentParserModule.NAME, AnalysisMetricModule.NAME};
}
private void addHandlers(Server gRPCServer) {
gRPCServer.addHandler(new ApplicationRegisterServiceHandler(getManager()));
gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new JVMMetricsServiceHandler(getManager()));
gRPCServer.addHandler(new TraceSegmentServiceHandler(getManager()));
gRPCServer.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));
private void addHandlers(GRPCServer gRPCServer) {
AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new ApplicationRegisterServiceHandler(getManager()));
AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new InstanceDiscoveryServiceHandler(getManager()));
AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new ServiceNameDiscoveryServiceHandler(getManager()));
AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new JVMMetricsServiceHandler(getManager()));
AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new TraceSegmentServiceHandler(getManager()));
AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new NetworkAddressRegisterServiceHandler(getManager()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.agent.grpc.provider;
import io.grpc.BindableService;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* Active the authentication token checker if expected token exists in application.yml
*
* @author wusheng
*/
public enum AuthenticationSimpleChecker {
INSTANCE;
private static final Metadata.Key<String> AUTH_HEAD_HEADER_NAME =
Metadata.Key.of("Authentication", Metadata.ASCII_STRING_MARSHALLER);
private String expectedToken = "";
public void build(GRPCServer gRPCServer, BindableService targetService) {
if (StringUtils.isNotEmpty(expectedToken)) {
gRPCServer.addHandler(ServerInterceptors.intercept(targetService, new ServerInterceptor() {
@Override
public <REQ, RESP> ServerCall.Listener<REQ> interceptCall(ServerCall<REQ, RESP> serverCall,
Metadata metadata,
ServerCallHandler<REQ, RESP> next) {
String token = metadata.get(AUTH_HEAD_HEADER_NAME);
if (expectedToken.equals(token)) {
return next.startCall(serverCall, metadata);
} else {
serverCall.close(Status.PERMISSION_DENIED, new Metadata());
return new ServerCall.Listener() {
};
}
}
}));
} else {
gRPCServer.addHandler(targetService);
}
}
public void setExpectedToken(String expectedToken) {
this.expectedToken = expectedToken;
}
}
......@@ -18,13 +18,8 @@
package org.apache.skywalking.apm.collector.agent.jetty.provider;
import java.util.Properties;
import org.apache.skywalking.apm.collector.agent.jetty.define.AgentJettyModule;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.ApplicationRegisterServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.InstanceDiscoveryServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.NetworkAddressRegisterServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.ServiceNameDiscoveryServiceHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.TraceSegmentServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.*;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.naming.AgentJettyNamingHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.naming.AgentJettyNamingListener;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
......@@ -37,7 +32,9 @@ import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
import java.util.Properties;
/**
* @author peng-yongsheng
......@@ -77,7 +74,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener));
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
JettyServer jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(jettyServer);
}
......@@ -89,7 +86,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
return new String[] {ClusterModule.NAME, NamingModule.NAME, JettyManagerModule.NAME};
}
private void addHandlers(Server jettyServer) {
private void addHandlers(JettyServer jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler(getManager()));
jettyServer.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyServer.addHandler(new InstanceDiscoveryServletHandler(getManager()));
......
......@@ -34,6 +34,9 @@ agent_gRPC:
#Set these two setting to open ssl
#ssl_cert_chain_file: $path
#ssl_private_key_file: $path
#Set your own token to active auth
#authentication: xxxxxx
agent_jetty:
jetty:
host: localhost
......
......@@ -32,8 +32,6 @@ public interface Server {
void start() throws ServerException;
void addHandler(ServerHandler handler);
boolean isSSLOpen();
boolean isStatusEqual(Server target);
......
......@@ -19,13 +19,14 @@
package org.apache.skywalking.apm.collector.server.grpc;
import io.grpc.BindableService;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.ServerException;
import org.apache.skywalking.apm.collector.server.ServerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -103,11 +104,15 @@ public class GRPCServer implements Server {
}
}
@Override
public void addHandler(ServerHandler handler) {
nettyServerBuilder.addService((io.grpc.BindableService) handler);
public void addHandler(BindableService handler) {
nettyServerBuilder.addService(handler);
}
public void addHandler(ServerServiceDefinition definition) {
nettyServerBuilder.addService(definition);
}
@Override
public boolean isSSLOpen() {
return sslContextBuilder == null;
......
......@@ -19,18 +19,17 @@
package org.apache.skywalking.apm.collector.server.jetty;
import java.net.InetSocketAddress;
import java.util.Objects;
import javax.servlet.http.HttpServlet;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.ServerException;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlet.ServletMapping;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.ServerException;
import org.apache.skywalking.apm.collector.server.ServerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Objects;
/**
* @author peng-yongsheng, wusheng
*/
......@@ -50,15 +49,18 @@ public class JettyServer implements Server {
this.contextPath = contextPath;
}
@Override public String hostPort() {
@Override
public String hostPort() {
return host + ":" + port;
}
@Override public String serverClassify() {
@Override
public String serverClassify() {
return "Jetty";
}
@Override public void initialize() throws ServerException {
@Override
public void initialize() throws ServerException {
server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port));
servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
......@@ -68,10 +70,10 @@ public class JettyServer implements Server {
server.setHandler(servletContextHandler);
}
@Override public void addHandler(ServerHandler handler) {
public void addHandler(JettyHandler handler) {
ServletHolder servletHolder = new ServletHolder();
servletHolder.setServlet((HttpServlet)handler);
servletContextHandler.addServlet(servletHolder, ((JettyHandler)handler).pathSpec());
servletHolder.setServlet(handler);
servletContextHandler.addServlet(servletHolder, handler.pathSpec());
}
@Override
......@@ -84,7 +86,8 @@ public class JettyServer implements Server {
return equals(target);
}
@Override public void start() throws ServerException {
@Override
public void start() throws ServerException {
logger.info("start server, host: {}, port: {}", host, port);
try {
for (ServletMapping servletMapping : servletContextHandler.getServletHandler().getServletMappings()) {
......
......@@ -20,7 +20,7 @@
package org.apache.skywalking.apm.collector.grpc.manager.service;
import org.apache.skywalking.apm.collector.core.module.Service;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
import java.io.File;
......@@ -28,7 +28,7 @@ import java.io.File;
* @author peng-yongsheng, wusheng
*/
public interface GRPCManagerService extends Service {
Server createIfAbsent(String host, int port) throws ServerCanNotBeCreatedException;
GRPCServer createIfAbsent(String host, int port) throws ServerCanNotBeCreatedException;
Server createIfAbsent(String host, int port, File certChainFile, File privateKeyFile) throws ServerCanNotBeCreatedException;
GRPCServer createIfAbsent(String host, int port, File certChainFile, File privateKeyFile) throws ServerCanNotBeCreatedException;
}
......@@ -19,7 +19,6 @@
package org.apache.skywalking.apm.collector.grpc.manager.service;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.ServerException;
import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
import org.slf4j.Logger;
......@@ -42,16 +41,16 @@ public class GRPCManagerServiceImpl implements GRPCManagerService {
}
@Override
public Server createIfAbsent(String host, int port) throws ServerCanNotBeCreatedException {
public GRPCServer createIfAbsent(String host, int port) throws ServerCanNotBeCreatedException {
return createOrChooseServer(host, port, new GRPCServer(host, port));
}
@Override
public Server createIfAbsent(String host, int port, File certChainFile, File privateKeyFile) throws ServerCanNotBeCreatedException {
public GRPCServer createIfAbsent(String host, int port, File certChainFile, File privateKeyFile) throws ServerCanNotBeCreatedException {
return createOrChooseServer(host, port, new GRPCServer(host, port, certChainFile, privateKeyFile));
}
private Server createOrChooseServer(String host, int port, GRPCServer newServer) throws ServerCanNotBeCreatedException {
private GRPCServer createOrChooseServer(String host, int port, GRPCServer newServer) throws ServerCanNotBeCreatedException {
String id = host + String.valueOf(port);
GRPCServer existServer = servers.get(id);
if (existServer != null) {
......
......@@ -20,14 +20,14 @@
package org.apache.skywalking.apm.collector.jetty.manager.service;
import org.apache.skywalking.apm.collector.core.module.Service;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.ServerHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author peng-yongsheng
*/
public interface JettyManagerService extends Service {
Server createIfAbsent(String host, int port, String contextPath);
JettyServer createIfAbsent(String host, int port, String contextPath);
void addHandler(String host, int port, ServerHandler serverHandler);
void addHandler(String host, int port, JettyHandler serverHandler);
}
......@@ -19,15 +19,15 @@
package org.apache.skywalking.apm.collector.jetty.manager.service;
import java.util.Map;
import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.ServerException;
import org.apache.skywalking.apm.collector.server.ServerHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author peng-yongsheng
*/
......@@ -41,7 +41,7 @@ public class JettyManagerServiceImpl implements JettyManagerService {
this.servers = servers;
}
@Override public Server createIfAbsent(String host, int port, String contextPath) {
@Override public JettyServer createIfAbsent(String host, int port, String contextPath) {
String id = host + String.valueOf(port);
if (servers.containsKey(id)) {
return servers.get(id);
......@@ -57,7 +57,7 @@ public class JettyManagerServiceImpl implements JettyManagerService {
}
}
@Override public void addHandler(String host, int port, ServerHandler serverHandler) {
@Override public void addHandler(String host, int port, JettyHandler serverHandler) {
String id = host + String.valueOf(port);
if (servers.containsKey(id)) {
servers.get(id).addHandler(serverHandler);
......
......@@ -20,10 +20,11 @@
package org.apache.skywalking.apm.collector.naming.jetty.service;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.server.ServerHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,8 +45,12 @@ public class NamingJettyHandlerRegisterService implements NamingHandlerRegisterS
this.port = port;
}
@Override public void register(ServerHandler namingHandler) {
@Override
public void register(ServerHandler namingHandler) {
if (!(namingHandler instanceof JettyHandler)) {
throw new IllegalArgumentException("NamingJettyHandlerRegisterService support JettyHandler only.");
}
JettyManagerService managerService = moduleManager.find(JettyManagerModule.NAME).getService(JettyManagerService.class);
managerService.addHandler(this.host, this.port, namingHandler);
managerService.addHandler(this.host, this.port, (JettyHandler)namingHandler);
}
}
......@@ -19,22 +19,23 @@
package org.apache.skywalking.apm.collector.remote.grpc;
import java.util.Properties;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.apache.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler;
import org.apache.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService;
import org.apache.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.apache.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService;
import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
import java.util.Properties;
/**
* @author peng-yongsheng
......@@ -76,7 +77,7 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
Integer port = (Integer)config.get(PORT);
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);
GRPCServer gRPCServer = managerService.createIfAbsent(host, port);
gRPCServer.addHandler(new RemoteCommonServiceHandler(remoteDataRegisterService));
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.ui.jetty;
import java.util.Properties;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
......@@ -30,13 +29,15 @@ import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.ui.UIModule;
import org.apache.skywalking.apm.collector.ui.jetty.handler.GraphQLHandler;
import org.apache.skywalking.apm.collector.ui.jetty.handler.naming.UIJettyNamingHandler;
import org.apache.skywalking.apm.collector.ui.jetty.handler.naming.UIJettyNamingListener;
import java.util.Properties;
/**
* @author peng-yongsheng
*/
......@@ -74,7 +75,7 @@ public class UIModuleJettyProvider extends ModuleProvider {
namingHandlerRegisterService.register(new UIJettyNamingHandler(namingListener));
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
JettyServer jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(jettyServer);
}
......@@ -86,7 +87,7 @@ public class UIModuleJettyProvider extends ModuleProvider {
return new String[] {ClusterModule.NAME, JettyManagerModule.NAME, NamingModule.NAME, CacheModule.NAME, StorageModule.NAME};
}
private void addHandlers(Server jettyServer) {
private void addHandlers(JettyServer jettyServer) {
jettyServer.addHandler(new GraphQLHandler(getManager()));
}
}
......@@ -42,6 +42,12 @@ public class Config {
*/
public static String APPLICATION_CODE = "";
/**
* Authentication active is based on backend setting, see application.yml for more details.
* For most scenarios, this needs backend extensions, only basic match auth provided in default implementation.
*/
public static String AUTHENTICATION = "";
/**
* Negative or zero means off, by default. {@link #SAMPLE_N_PER_3_SECS} means sampling N {@link TraceSegment} in
* 10 seconds tops.
......
......@@ -18,12 +18,7 @@
package org.apache.skywalking.apm.agent.core.jvm;
import io.grpc.ManagedChannel;
import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import io.grpc.Channel;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
......@@ -44,6 +39,12 @@ import org.apache.skywalking.apm.network.proto.JVMMetrics;
import org.apache.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* The <code>JVMService</code> represents a timer,
* which collectors JVM cpu, memory, memorypool and gc info,
......@@ -149,7 +150,7 @@ public class JVMService implements BootService, Runnable {
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
stub = JVMMetricsServiceGrpc.newBlockingStub(channel);
}
this.status = status;
......
......@@ -18,11 +18,7 @@
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.ManagedChannel;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import io.grpc.Channel;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
......@@ -37,17 +33,14 @@ import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.os.OSUtil;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationInstance;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.NetworkAddressRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* @author wusheng
*/
......@@ -66,7 +59,7 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.util.StringUtil;
/**
* Active authentication header by Config.Agent.AUTHENTICATION
*
* @author wu-sheng, zhang xin
*/
public class AuthenticationDecorator implements ChannelDecorator {
private static final Metadata.Key<String> AUTH_HEAD_HEADER_NAME =
Metadata.Key.of("Authentication", Metadata.ASCII_STRING_MARSHALLER);
@Override
public Channel build(Channel channel) {
if (StringUtil.isEmpty(Config.Agent.AUTHENTICATION)) {
return channel;
}
return ClientInterceptors.intercept(channel, new ClientInterceptor() {
@Override
public <REQ, RESP> ClientCall<REQ, RESP> interceptCall(MethodDescriptor<REQ, RESP> method,
CallOptions options, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<REQ, RESP>(channel.newCall(method, options)) {
@Override
public void start(Listener<RESP> responseListener, Metadata headers) {
headers.put(AUTH_HEAD_HEADER_NAME, Config.Agent.AUTHENTICATION);
super.start(responseListener, headers);
}
};
}
});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.ManagedChannelBuilder;
/**
* @author zhang xin
*/
public interface ChannelBuilder<B extends ManagedChannelBuilder> {
B build(B managedChannelBuilder) throws Exception;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
/**
* @author zhang xin
*/
public interface ChannelDecorator {
Channel build(Channel channel);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import java.util.LinkedList;
import java.util.List;
/**
* @author zhangxin
*/
public class GRPCChannel {
/**
* origin channel
*/
private final ManagedChannel originChannel;
private final Channel channelWithDecorators;
private GRPCChannel(String host, int port, List<ChannelBuilder> channelBuilders,
List<ChannelDecorator> decorators) throws Exception {
ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port);
for (ChannelBuilder builder : channelBuilders) {
channelBuilder = builder.build(channelBuilder);
}
this.originChannel = channelBuilder.build();
Channel channel = originChannel;
for (ChannelDecorator decorator : decorators) {
channel = decorator.build(channel);
}
channelWithDecorators = channel;
}
public static Builder newBuilder(String host, int port) {
return new Builder(host, port);
}
public Channel getChannel() {
return this.channelWithDecorators;
}
public boolean isTerminated() {
return originChannel.isTerminated();
}
public void shutdownNow() {
originChannel.shutdownNow();
}
public boolean isShutdown() {
return originChannel.isShutdown();
}
public static class Builder {
private final String host;
private final int port;
private final List<ChannelBuilder> channelBuilders;
private final List<ChannelDecorator> decorators;
private Builder(String host, int port) {
this.host = host;
this.port = port;
this.channelBuilders = new LinkedList<ChannelBuilder>();
this.decorators = new LinkedList<ChannelDecorator>();
}
public Builder addChannelDecorator(ChannelDecorator interceptor) {
this.decorators.add(interceptor);
return this;
}
public GRPCChannel build() throws Exception {
return new GRPCChannel(host, port, channelBuilders, decorators);
}
public Builder addManagedChannelBuilder(ChannelBuilder builder) {
channelBuilders.add(builder);
return this;
}
}
}
......@@ -16,15 +16,11 @@
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.ManagedChannel;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.DnsNameResolverProvider;
import io.grpc.netty.NettyChannelBuilder;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......@@ -32,22 +28,21 @@ import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/**
* @author wusheng
* @author wusheng, zhang xin
*/
public class GRPCChannelManager implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(GRPCChannelManager.class);
private volatile ManagedChannel managedChannel = null;
private volatile GRPCChannel managedChannel = null;
private volatile ScheduledFuture<?> connectCheckFuture;
private volatile boolean reconnect = true;
private Random random = new Random();
......@@ -61,13 +56,13 @@ public class GRPCChannelManager implements BootService, Runnable {
@Override
public void boot() throws Throwable {
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
......@@ -94,14 +89,13 @@ public class GRPCChannelManager implements BootService, Runnable {
int index = Math.abs(random.nextInt()) % RemoteDownstreamConfig.Collector.GRPC_SERVERS.size();
server = RemoteDownstreamConfig.Collector.GRPC_SERVERS.get(index);
String[] ipAndPort = server.split(":");
NettyChannelBuilder channelBuilder =
new TLSChannelBuilder(
NettyChannelBuilder.forAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.nameResolverFactory(new DnsNameResolverProvider())
.maxInboundMessageSize(1024 * 1024 * 50)
.usePlaintext(true)
).buildTLS();
managedChannel = channelBuilder.build();
managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AuthenticationDecorator())
.build();
if (!managedChannel.isShutdown() && !managedChannel.isTerminated()) {
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
......@@ -123,8 +117,8 @@ public class GRPCChannelManager implements BootService, Runnable {
listeners.add(listener);
}
public ManagedChannel getManagedChannel() {
return managedChannel;
public Channel getChannel() {
return managedChannel.getChannel();
}
/**
......@@ -150,13 +144,13 @@ public class GRPCChannelManager implements BootService, Runnable {
private boolean isNetworkError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) throwable;
StatusRuntimeException statusRuntimeException = (StatusRuntimeException)throwable;
return statusEquals(statusRuntimeException.getStatus(),
Status.UNAVAILABLE,
Status.PERMISSION_DENIED,
Status.UNAUTHENTICATED,
Status.RESOURCE_EXHAUSTED,
Status.UNKNOWN
Status.UNAVAILABLE,
Status.PERMISSION_DENIED,
Status.UNAUTHENTICATED,
Status.RESOURCE_EXHAUSTED,
Status.UNKNOWN
);
}
return false;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.ManagedChannelBuilder;
import io.grpc.internal.DnsNameResolverProvider;
/**
* @author zhang xin
*/
public class StandardChannelBuilder implements ChannelBuilder {
private final static int MAX_INBOUND_MESSAGE_SIZE = 1024 * 1024 * 50;
private final static boolean USE_PLAIN_TEXT = true;
@Override public ManagedChannelBuilder build(ManagedChannelBuilder managedChannelBuilder) throws Exception {
return managedChannelBuilder.nameResolverFactory(new DnsNameResolverProvider())
.maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.usePlaintext(USE_PLAIN_TEXT);
}
}
......@@ -22,42 +22,29 @@ import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.File;
import javax.net.ssl.SSLException;
import org.apache.skywalking.apm.agent.core.boot.AgentPackageNotFoundException;
import org.apache.skywalking.apm.agent.core.boot.AgentPackagePath;
import org.apache.skywalking.apm.agent.core.conf.Constants;
import javax.net.ssl.SSLException;
import java.io.File;
/**
* Detect the `/ca` folder in agent package, if `ca.crt` exists, start TLS (no mutual auth).
*
* @author wusheng
*/
public class TLSChannelBuilder {
public class TLSChannelBuilder implements ChannelBuilder<NettyChannelBuilder> {
private static String CA_FILE_NAME = "ca" + Constants.PATH_SEPARATOR + "ca.crt";
private NettyChannelBuilder nettyChannelBuilder;
public TLSChannelBuilder(NettyChannelBuilder nettyChannelBuilder) {
this.nettyChannelBuilder = nettyChannelBuilder;
}
/**
* Build a TLS supported channel is necessary.
*
* @return chanel builder
* @throws AgentPackageNotFoundException
* @throws SSLException
*/
NettyChannelBuilder buildTLS() throws AgentPackageNotFoundException, SSLException {
@Override public NettyChannelBuilder build(
NettyChannelBuilder managedChannelBuilder) throws AgentPackageNotFoundException, SSLException {
File caFile = new File(AgentPackagePath.getPath(), CA_FILE_NAME);
if (caFile.exists() && caFile.isFile()) {
SslContextBuilder builder = GrpcSslContexts.forClient();
builder.trustManager(caFile);
nettyChannelBuilder = nettyChannelBuilder.negotiationType(NegotiationType.TLS)
.sslContext(builder.build());
managedChannelBuilder = managedChannelBuilder.negotiationType(NegotiationType.TLS)
.sslContext(builder.build());
}
return nettyChannelBuilder;
return managedChannelBuilder;
}
}
......@@ -19,23 +19,24 @@
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.ManagedChannel;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.proto.Downstream;
import org.apache.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;
import java.util.List;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
......@@ -169,7 +170,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
@Override
public void statusChanged(GRPCChannelStatus status) {
if (CONNECTED.equals(status)) {
ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
serviceStub = TraceSegmentServiceGrpc.newStub(channel);
}
this.status = status;
......
......@@ -21,6 +21,9 @@ agent.application_code=Your_ApplicationName
# Negative number means sample traces as many as possible, most likely 100%
# agent.sample_n_per_3_secs=-1
# Authentication active is based on backend setting, see application.yml for more details.
# agent.authentication = xxxx
# The max amount of spans in a single segment.
# Through this config item, skywalking keep your application memory cost estimated.
# agent.span_limit_per_segment=300
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册