未验证 提交 30ce1959 编写于 作者: G Gao Hongtao 提交者: GitHub

Hot reload gRPC certs of OAP. (#5376)

上级 7ef8b3e4
......@@ -14,10 +14,10 @@ fairly straightforward using `openssl` from the command line.
Use this [script](../../../../tools/TLS/tls_key_generate.sh) if you are not familiar with how to generate key files.
We need below files:
- `server.pem` a private RSA key to sign and authenticate the public key.
- `server.pem` a private RSA key to sign and authenticate the public key. It's either a PKCS#8(PEM) or PKCS#1(DER).
- `server.crt` self-signed X.509 public keys for distribution.
- `ca.crt` a certificate authority public key for a client to validate the server's certificate.
## Config OAP server
You can enable gRPC SSL by add following lines to `application.yml/core/default`.
......@@ -31,6 +31,8 @@ gRPCSslTrustedCAPath: /path/to/ca.crt
`gRPCSslKeyPath` and `gRPCSslCertChainPath` are loaded by OAP server to encrypt the communication. `gRPCSslTrustedCAPath`
helps gRPC client to verify server certificates in cluster mode.
When new files are in place, they can be load dynamically instead of restarting OAP instance.
If you enable `sharding-server` to ingest data from external, add following lines to `application.yml/receiver-sharing-server/default`:
```json
......
......@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.core.analysis.ApdexThresholdConfig;
......@@ -181,8 +180,8 @@ public class CoreModuleProvider extends ModuleProvider {
if (moduleConfig.isGRPCSslEnabled()) {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
Paths.get(moduleConfig.getGRPCSslCertChainPath()).toFile(),
Paths.get(moduleConfig.getGRPCSslKeyPath()).toFile()
moduleConfig.getGRPCSslCertChainPath(),
moduleConfig.getGRPCSslKeyPath()
);
} else {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
......@@ -266,9 +265,7 @@ public class CoreModuleProvider extends ModuleProvider {
if (moduleConfig.isGRPCSslEnabled()) {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout(),
Paths.get(moduleConfig.getGRPCSslTrustedCAPath())
.toFile()
);
moduleConfig.getGRPCSslTrustedCAPath());
} else {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
}
......
......@@ -20,9 +20,6 @@ package org.apache.skywalking.oap.server.core.remote.client;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import io.grpc.netty.GrpcSslContexts;
import io.netty.handler.ssl.SslContext;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
......@@ -30,11 +27,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
......@@ -43,6 +40,7 @@ import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.server.grpc.ssl.DynamicSslContext;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
......@@ -59,7 +57,7 @@ public class RemoteClientManager implements Service {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteClientManager.class);
private final ModuleDefineHolder moduleDefineHolder;
private SslContext sslContext;
private DynamicSslContext sslContext;
private ClusterNodesQuery clusterNodesQuery;
private volatile List<RemoteClient> usingClients;
private GaugeMetrics gauge;
......@@ -73,13 +71,9 @@ public class RemoteClientManager implements Service {
*/
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder,
int remoteTimeout,
File trustedCAFile) {
String trustedCAFile) {
this(moduleDefineHolder, remoteTimeout);
try {
sslContext = GrpcSslContexts.forClient().trustManager(trustedCAFile).build();
} catch (SSLException e) {
throw new IllegalArgumentException(e);
}
sslContext = DynamicSslContext.forClient(trustedCAFile);
}
/**
......@@ -96,6 +90,7 @@ public class RemoteClientManager implements Service {
}
public void start() {
Optional.ofNullable(sslContext).ifPresent(DynamicSslContext::start);
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS);
}
......
......@@ -29,6 +29,12 @@
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
......
......@@ -18,17 +18,15 @@
package org.apache.skywalking.oap.server.library.server.grpc;
import com.google.common.base.Strings;
import io.grpc.BindableService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
......@@ -37,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.server.Server;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.ssl.DynamicSslContext;
@Slf4j
public class GRPCServer implements Server {
......@@ -47,9 +46,9 @@ public class GRPCServer implements Server {
private int maxMessageSize;
private io.grpc.Server server;
private NettyServerBuilder nettyServerBuilder;
private SslContextBuilder sslContextBuilder;
private File certChainFile;
private File privateKeyFile;
private String certChainFile;
private String privateKeyFile;
private DynamicSslContext sslContext;
private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 4;
private int threadPoolQueueSize = 10000;
......@@ -82,11 +81,10 @@ public class GRPCServer implements Server {
* @param certChainFile `server.crt` file
* @param privateKeyFile `server.pem` file
*/
public GRPCServer(String host, int port, File certChainFile, File privateKeyFile) {
public GRPCServer(String host, int port, String certChainFile, String privateKeyFile) {
this(host, port);
this.certChainFile = certChainFile;
this.privateKeyFile = privateKeyFile;
this.sslContextBuilder = SslContextBuilder.forServer(certChainFile, privateKeyFile);
}
@Override
......@@ -111,6 +109,10 @@ public class GRPCServer implements Server {
nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection)
.maxInboundMessageSize(maxMessageSize)
.executor(executor);
if (!Strings.isNullOrEmpty(privateKeyFile) && !Strings.isNullOrEmpty(certChainFile)) {
sslContext = DynamicSslContext.forServer(privateKeyFile, certChainFile);
nettyServerBuilder.sslContext(sslContext);
}
log.info("Server started, host {} listening on {}", host, port);
}
......@@ -125,11 +127,7 @@ public class GRPCServer implements Server {
@Override
public void start() throws ServerException {
try {
if (sslContextBuilder != null) {
nettyServerBuilder = nettyServerBuilder.sslContext(
GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL)
.build());
}
Optional.ofNullable(sslContext).ifPresent(DynamicSslContext::start);
server = nettyServerBuilder.build();
server.start();
} catch (IOException e) {
......@@ -154,7 +152,7 @@ public class GRPCServer implements Server {
@Override
public boolean isSSLOpen() {
return sslContextBuilder == null;
return !Strings.isNullOrEmpty(privateKeyFile) && !Strings.isNullOrEmpty(certChainFile);
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.server.grpc.ssl;
import io.grpc.netty.GrpcSslContexts;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.ssl.ApplicationProtocolNegotiator;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.List;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSessionContext;
import org.apache.skywalking.oap.server.library.util.MultipleFilesChangeMonitor;
/**
* Load SslContext dynamically.
*/
public class DynamicSslContext extends SslContext {
private final MultipleFilesChangeMonitor monitor;
private volatile SslContext ctx;
public static DynamicSslContext forServer(final String privateKeyFile, final String certChainFile) {
return new DynamicSslContext(privateKeyFile, certChainFile);
}
public static DynamicSslContext forClient(final String caFile) {
return new DynamicSslContext(caFile);
}
private DynamicSslContext(final String privateKeyFile, final String certChainFile) {
updateContext(privateKeyFile, certChainFile);
monitor = new MultipleFilesChangeMonitor(
10,
readableContents -> updateContext(privateKeyFile, certChainFile),
certChainFile,
privateKeyFile);
}
private DynamicSslContext(final String caFile) {
updateContext(caFile);
monitor = new MultipleFilesChangeMonitor(
10,
readableContents -> updateContext(caFile),
caFile);
}
private void updateContext(String caFile) {
try {
ctx = GrpcSslContexts.forClient().trustManager(Paths.get(caFile).toFile()).build();
} catch (SSLException e) {
throw new IllegalArgumentException(e);
}
}
private void updateContext(final String privateKeyFile, final String certChainFile) {
try {
ctx = GrpcSslContexts
.configure(SslContextBuilder
.forServer(
new FileInputStream(Paths.get(certChainFile).toFile()),
PrivateKeyUtil.loadDecryptionKey(privateKeyFile)),
SslProvider.OPENSSL)
.build();
} catch (GeneralSecurityException | IOException e) {
throw new IllegalArgumentException(e);
}
}
public void start() {
monitor.start();
}
@Override
public final boolean isClient() {
return ctx.isClient();
}
@Override
public final List<String> cipherSuites() {
return ctx.cipherSuites();
}
@Override
public final long sessionCacheSize() {
return ctx.sessionCacheSize();
}
@Override
public final long sessionTimeout() {
return ctx.sessionTimeout();
}
@Override
public final ApplicationProtocolNegotiator applicationProtocolNegotiator() {
return ctx.applicationProtocolNegotiator();
}
@Override
public final SSLEngine newEngine(ByteBufAllocator alloc) {
return ctx.newEngine(alloc);
}
@Override
public final SSLEngine newEngine(ByteBufAllocator alloc, String peerHost, int peerPort) {
return ctx.newEngine(alloc, peerHost, peerPort);
}
@Override
public final SSLSessionContext sessionContext() {
return ctx.sessionContext();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.server.grpc.ssl;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.Base64;
/**
* Util intends to parse PKCS#1 and PKCS#8 at same time.
*/
class PrivateKeyUtil {
private static final String PKCS_1_PEM_HEADER = "-----BEGIN RSA PRIVATE KEY-----";
private static final String PKCS_1_PEM_FOOTER = "-----END RSA PRIVATE KEY-----";
private static final String PKCS_8_PEM_HEADER = "-----BEGIN PRIVATE KEY-----";
private static final String PKCS_8_PEM_FOOTER = "-----END PRIVATE KEY-----";
/**
* Load a RSA decryption key from a file (PEM or DER).
*/
static InputStream loadDecryptionKey(String keyFilePath) throws GeneralSecurityException, IOException {
byte[] keyDataBytes = Files.readAllBytes(Paths.get(keyFilePath));
String keyDataString = new String(keyDataBytes, StandardCharsets.UTF_8);
if (keyDataString.contains(PKCS_1_PEM_HEADER)) {
// OpenSSL / PKCS#1 Base64 PEM encoded file
keyDataString = keyDataString.replace(PKCS_1_PEM_HEADER, "");
keyDataString = keyDataString.replace(PKCS_1_PEM_FOOTER, "");
keyDataString = keyDataString.replace("\n", "");
return readPkcs1PrivateKey(Base64.getDecoder().decode(keyDataString));
}
return new ByteArrayInputStream(keyDataString.getBytes());
}
/**
* Create a InputStream instance from raw PKCS#1 bytes. Raw Java API can't recognize ASN.1 format, so we should
* convert it into a pkcs#8 format Java can understand.
*/
private static InputStream readPkcs1PrivateKey(byte[] pkcs1Bytes) throws GeneralSecurityException {
int pkcs1Length = pkcs1Bytes.length;
int totalLength = pkcs1Length + 22;
byte[] pkcs8Header = new byte[] {
0x30, (byte) 0x82, (byte) ((totalLength >> 8) & 0xff), (byte) (totalLength & 0xff), // Sequence + total length
0x2, 0x1, 0x0, // Integer (0)
0x30, 0xD, 0x6, 0x9, 0x2A, (byte) 0x86, 0x48, (byte) 0x86, (byte) 0xF7, 0xD, 0x1, 0x1, 0x1, 0x5, 0x0, // Sequence: 1.2.840.113549.1.1.1, NULL
0x4, (byte) 0x82, (byte) ((pkcs1Length >> 8) & 0xff), (byte) (pkcs1Length & 0xff) // Octet string + length
};
StringBuilder pkcs8 = new StringBuilder(PKCS_8_PEM_HEADER);
pkcs8.append("\n").append(new String(Base64.getEncoder().encode(join(pkcs8Header, pkcs1Bytes))));
pkcs8.append("\n").append(PKCS_8_PEM_FOOTER);
return new ByteArrayInputStream(pkcs8.toString().getBytes());
}
private static byte[] join(byte[] byteArray1, byte[] byteArray2) {
byte[] bytes = new byte[byteArray1.length + byteArray2.length];
System.arraycopy(byteArray1, 0, bytes, 0, byteArray1.length);
System.arraycopy(byteArray2, 0, bytes, byteArray1.length, byteArray2.length);
return bytes;
}
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.receiver.sharing.server;
import java.nio.file.Paths;
import java.util.Objects;
import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.apm.util.StringUtil;
......@@ -102,8 +101,8 @@ public class SharingServerModuleProvider extends ModuleProvider {
grpcServer = new GRPCServer(
Strings.isBlank(config.getGRPCHost()) ? "0.0.0.0" : config.getGRPCHost(),
config.getGRPCPort(),
Paths.get(config.getGRPCSslCertChainPath()).toFile(),
Paths.get(config.getGRPCSslKeyPath()).toFile()
config.getGRPCSslCertChainPath(),
config.getGRPCSslKeyPath()
);
} else {
grpcServer = new GRPCServer(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册