ProxyConfiguration.java 17.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.proxy.server;

21 22
import com.google.common.collect.Sets;

23 24 25
import java.util.HashMap;
import java.util.List;
import java.util.Map;
26
import java.util.Optional;
27 28
import java.util.Properties;
import java.util.Set;
29 30 31
import java.util.stream.Collectors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
32

33 34
import lombok.Getter;
import lombok.Setter;
35
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
36
import org.apache.pulsar.common.configuration.Category;
37
import org.apache.pulsar.common.configuration.FieldContext;
38 39
import org.apache.pulsar.common.configuration.PropertiesContext;
import org.apache.pulsar.common.configuration.PropertyContext;
40
import org.apache.pulsar.common.configuration.PulsarConfiguration;
41
import org.apache.pulsar.common.sasl.SaslConstants;
42

43 44
@Getter
@Setter
45
public class ProxyConfiguration implements PulsarConfiguration {
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
    @Category
    private static final String CATEGORY_SERVER = "Server";
    @Category
    private static final String CATEGORY_BROKER_DISCOVERY = "Broker Discovery";
    @Category
    private static final String CATEGORY_AUTHENTICATION = "Proxy Authentication";
    @Category
    private static final String CATEGORY_AUTHORIZATION = "Proxy Authorization";
    @Category(
        description = "the settings are for configuring how proxies authenticates with Pulsar brokers"
    )
    private static final String CATEGORY_CLIENT_AUTHENTICATION = "Broker Client Authorization";
    @Category
    private static final String CATEGORY_RATE_LIMITING = "RateLimiting";
    @Category
    private static final String CATEGORY_TLS = "TLS";
    @Category
    private static final String CATEGORY_TOKEN_AUTH = "Token Authentication Provider";
    @Category
    private static final String CATEGORY_HTTP = "HTTP";
66 67
    @Category
    private static final String CATEGORY_SASL_AUTH = "SASL Authentication Provider";
68 69 70 71 72

    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "The ZooKeeper quorum connection string (as a comma-separated list)"
    )
73
    private String zookeeperServers;
74 75 76 77 78 79 80 81 82
    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "Configuration store connection string (as a comma-separated list)"
    )
    private String configurationStoreServers;
    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "Global ZooKeeper quorum connection string (as a comma-separated list)"
    )
83
    @Deprecated
84 85
    private String globalZookeeperServers;

86 87 88 89
    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "ZooKeeper session timeout (in milliseconds)"
    )
90
    private int zookeeperSessionTimeoutMs = 30_000;
91

92 93 94 95
    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "The service url points to the broker cluster"
    )
96
    private String brokerServiceURL;
97 98 99 100
    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "The tls service url points to the broker cluster"
    )
101
    private String brokerServiceURLTLS;
102

103 104 105 106
    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "The web service url points to the broker cluster"
    )
107
    private String brokerWebServiceURL;
108 109 110 111
    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "The tls web service url points to the broker cluster"
    )
112 113
    private String brokerWebServiceURLTLS;

114 115 116 117 118
    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "The web service url points to the function worker cluster."
            + " Only configure it when you setup function workers in a separate cluster"
    )
119
    private String functionWorkerWebServiceURL;
120 121 122 123 124
    @FieldContext(
        category = CATEGORY_BROKER_DISCOVERY,
        doc = "The tls web service url points to the function worker cluster."
            + " Only configure it when you setup function workers in a separate cluster"
    )
125 126
    private String functionWorkerWebServiceURLTLS;

127 128 129 130
    @FieldContext(
        category = CATEGORY_SERVER,
        doc = "The port for serving binary protobuf request"
    )
131
    private Optional<Integer> servicePort = Optional.ofNullable(6650);
132 133 134 135
    @FieldContext(
        category = CATEGORY_SERVER,
        doc = "The port for serving tls secured binary protobuf request"
    )
136
    private Optional<Integer> servicePortTls = Optional.empty();
137

138 139 140 141
    @FieldContext(
        category = CATEGORY_SERVER,
        doc = "The port for serving http requests"
    )
142
    private Optional<Integer> webServicePort = Optional.ofNullable(8080);
143 144 145 146
    @FieldContext(
        category = CATEGORY_SERVER,
        doc = "The port for serving https requests"
    )
147
    private Optional<Integer> webServicePortTls = Optional.empty();
148

S
Samuel 已提交
149 150 151 152 153 154 155 156 157
    @FieldContext(
            category = CATEGORY_SERVER,
            doc = "Proxy log level, default is 0."
                    + " 0: Do not log any tcp channel info"
                    + " 1: Parse and log any tcp channel info and command info without message body"
                    + " 2: Parse and log channel info, command info and message body"
    )
    private Integer proxyLogLevel = 0;

158 159 160 161 162
    @FieldContext(
        category = CATEGORY_SERVER,
        doc = "Path for the file used to determine the rotation status for the proxy instance"
            + " when responding to service discovery health checks"
    )
163
    private String statusFilePath;
164

165 166 167 168 169 170
    @FieldContext(
        category = CATEGORY_AUTHORIZATION,
        doc = "A list of role names (a comma-separated list of strings) that are treated as"
            + " `super-user`, meaning they will be able to do all admin operations and publish"
            + " & consume from all topics"
    )
171 172
    private Set<String> superUserRoles = Sets.newTreeSet();

173 174 175 176
    @FieldContext(
        category = CATEGORY_AUTHENTICATION,
        doc = "Whether authentication is enabled for the Pulsar proxy"
    )
177
    private boolean authenticationEnabled = false;
178 179 180 181
    @FieldContext(
        category = CATEGORY_AUTHENTICATION,
        doc = "Authentication provider name list (a comma-separated list of class names"
    )
182
    private Set<String> authenticationProviders = Sets.newTreeSet();
183 184 185 186
    @FieldContext(
        category = CATEGORY_AUTHORIZATION,
        doc = "Whether authorization is enforced by the Pulsar proxy"
    )
187
    private boolean authorizationEnabled = false;
188 189 190 191
    @FieldContext(
        category = CATEGORY_AUTHORIZATION,
        doc = "Authorization provider as a fully qualified class name"
    )
192
    private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
193 194 195 196 197 198
    @FieldContext(
        category = CATEGORY_AUTHORIZATION,
        doc = "Whether client authorization credentials are forwarded to the broker for re-authorization."
            + "Authentication must be enabled via configuring `authenticationEnabled` to be true for this"
            + "to take effect"
    )
J
Jai Asher 已提交
199
    private boolean forwardAuthorizationCredentials = false;
200 201 202 203 204 205
    @FieldContext(
        category = CATEGORY_AUTHENTICATION,
        doc = "Whether the '/metrics' endpoint requires authentication. Defaults to true."
            + "'authenticationEnabled' must also be set for this to take effect."
    )
    private boolean authenticateMetricsEndpoint = true;
206

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227

    @FieldContext(
        category = CATEGORY_SASL_AUTH,
        doc = "This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.\n"
            + " Default value is: \".*pulsar.*\", so only clients whose id contains 'pulsar' are allowed to connect."
    )
    private String saslJaasClientAllowedIds = SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT;

    @FieldContext(
        category = CATEGORY_SASL_AUTH,
        doc = "Service Principal, for login context name. Default value is \"PulsarProxy\"."
    )
    private String saslJaasServerSectionName = SaslConstants.JAAS_DEFAULT_PROXY_SECTION_NAME;

    @FieldContext(
        category = CATEGORY_SASL_AUTH,
        doc = "kerberos kinit command."
    )
    private String kinitCommand = "/usr/bin/kinit";


228 229 230 231
    @FieldContext(
        category = CATEGORY_RATE_LIMITING,
        doc = "Max concurrent inbound connections. The proxy will reject requests beyond that"
    )
232 233
    private int maxConcurrentInboundConnections = 10000;

234 235 236 237
    @FieldContext(
        category = CATEGORY_RATE_LIMITING,
        doc = "Max concurrent lookup requests. The proxy will reject requests beyond that"
    )
238
    private int maxConcurrentLookupRequests = 50000;
239

240 241 242 243
    @FieldContext(
        category = CATEGORY_CLIENT_AUTHENTICATION,
        doc = "The authentication plugin used by the Pulsar proxy to authenticate with Pulsar brokers"
    )
244
    private String brokerClientAuthenticationPlugin;
245 246 247 248
    @FieldContext(
        category = CATEGORY_CLIENT_AUTHENTICATION,
        doc = "The authentication parameters used by the Pulsar proxy to authenticate with Pulsar brokers"
    )
249
    private String brokerClientAuthenticationParameters;
250 251 252 253
    @FieldContext(
        category = CATEGORY_CLIENT_AUTHENTICATION,
        doc = "The path to trusted certificates used by the Pulsar proxy to authenticate with Pulsar brokers"
    )
254
    private String brokerClientTrustCertsFilePath;
255

256 257 258 259 260 261
    @FieldContext(
        category = CATEGORY_CLIENT_AUTHENTICATION,
        doc = "Whether TLS is enabled when communicating with Pulsar brokers"
    )
    private boolean tlsEnabledWithBroker = false;

262 263 264 265 266 267
    @FieldContext(
            category = CATEGORY_AUTHORIZATION,
            doc = "When this parameter is not empty, unauthenticated users perform as anonymousUserRole"
    )
    private String anonymousUserRole = null;

268
    /***** --- TLS --- ****/
269

270
    @Deprecated
271
    private boolean tlsEnabledInProxy = false;
272 273
    @FieldContext(
        category = CATEGORY_TLS,
274
        doc = "Tls cert refresh duration in seconds (set 0 to check on every new connection)"
275
    )
276
    private long tlsCertRefreshCheckDurationSec = 300; // 5 mins
277 278 279 280
    @FieldContext(
        category = CATEGORY_TLS,
        doc = "Path for the TLS certificate file"
    )
281
    private String tlsCertificateFilePath;
282 283 284 285
    @FieldContext(
        category = CATEGORY_TLS,
        doc = "Path for the TLS private key file"
    )
286
    private String tlsKeyFilePath;
287 288 289 290 291 292 293
    @FieldContext(
        category = CATEGORY_TLS,
        doc = "Path for the trusted TLS certificate file.\n\n"
            + "This cert is used to verify that any certs presented by connecting clients"
            + " are signed by a certificate authority. If this verification fails, then the"
            + " certs are untrusted and the connections are dropped"
    )
294
    private String tlsTrustCertsFilePath;
295 296 297 298 299 300 301
    @FieldContext(
        category = CATEGORY_TLS,
        doc = "Accept untrusted TLS certificate from client.\n\n"
            + "If true, a client with a cert which cannot be verified with the `tlsTrustCertsFilePath`"
            + " cert will be allowed to connect to the server, though the cert will not be used for"
            + " client authentication"
    )
302
    private boolean tlsAllowInsecureConnection = false;
303 304 305 306
    @FieldContext(
        category = CATEGORY_TLS,
        doc = "Whether the hostname is validated when the proxy creates a TLS connection with brokers"
    )
307
    private boolean tlsHostnameVerificationEnabled = false;
308 309 310 311 312 313
    @FieldContext(
        category = CATEGORY_TLS,
        doc = "Specify the tls protocols the broker will use to negotiate during TLS handshake"
            + " (a comma-separated list of protocol names).\n\n"
            + "Examples:- [TLSv1.2, TLSv1.1, TLSv1]"
    )
314
    private Set<String> tlsProtocols = Sets.newTreeSet();
315 316 317 318 319 320
    @FieldContext(
        category = CATEGORY_TLS,
        doc = "Specify the tls cipher the broker will use to negotiate during TLS Handshake"
            + " (a comma-separated list of ciphers).\n\n"
            + "Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
    )
321
    private Set<String> tlsCiphers = Sets.newTreeSet();
322 323 324 325 326
    @FieldContext(
        category = CATEGORY_TLS,
        doc = "Whether client certificates are required for TLS.\n\n"
            + " Connections are rejected if the client certificate isn't trusted"
    )
327
    private boolean tlsRequireTrustedClientCertOnConnect = false;
328

329 330 331 332
    @FieldContext(
        category = CATEGORY_HTTP,
        doc = "Http directs to redirect to non-pulsar services"
    )
333 334
    private Set<HttpReverseProxyConfig> httpReverseProxyConfigs = Sets.newHashSet();

335 336 337 338 339 340 341 342 343 344 345
    @FieldContext(
        minValue = 1,
        category = CATEGORY_HTTP,
        doc = "Http output buffer size.\n\n"
            + "The amount of data that will be buffered for http requests "
            + "before it is flushed to the channel. A larger buffer size may "
            + "result in higher http throughput though it may take longer for "
            + "the client to see data. If using HTTP streaming via the reverse "
            + "proxy, this should be set to the minimum value, 1, so that clients "
            + "see the data as soon as possible."
    )
346 347
    private int httpOutputBufferSize = 32*1024;

348 349 350 351 352
    @FieldContext(
           minValue = 1,
           category = CATEGORY_HTTP,
           doc = "Number of threads to use for HTTP requests processing"
    )
353
    private int httpNumThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors());
354

355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
    @PropertiesContext(
        properties = {
            @PropertyContext(
                key = "tokenPublicKey",
                doc = @FieldContext(
                    category = CATEGORY_TOKEN_AUTH,
                    doc = "Asymmetric public/private key pair.\n\n"
                        + "Configure the public key to be used to validate auth tokens"
                        + " The key can be specified like:\n\n"
                        + "tokenPublicKey=data:base64,xxxxxxxxx\n"
                        + "tokenPublicKey=file:///my/public.key")
            ),
            @PropertyContext(
                key = "tokenSecretKey",
                doc = @FieldContext(
                    category = CATEGORY_TOKEN_AUTH,
                    doc = "Symmetric key.\n\n"
                        + "Configure the secret key to be used to validate auth tokens"
                        + "The key can be specified like:\n\n"
                        + "tokenSecretKey=data:base64,xxxxxxxxx\n"
                        + "tokenSecretKey=file:///my/secret.key")
            )
        }
    )
379 380 381 382 383 384
    private Properties properties = new Properties();

    public Properties getProperties() {
        return properties;
    }

385
    public Optional<Integer> getServicePort() {
386
        return servicePort;
387 388
    }

S
Samuel 已提交
389 390 391 392 393 394 395
    public Optional<Integer> getproxyLogLevel() {
        return Optional.ofNullable(proxyLogLevel);
    }
    public void setProxyLogLevel(int proxyLogLevel) {
        this.proxyLogLevel = proxyLogLevel;
    }

396
    public Optional<Integer> getServicePortTls() {
397
        return servicePortTls;
398
    }
399

400
    public Optional<Integer> getWebServicePort() {
401
        return webServicePort;
402
    }
403

404
    public Optional<Integer> getWebServicePortTls() {
405
        return webServicePortTls;
406
    }
407

408 409
    public void setProperties(Properties properties) {
        this.properties = properties;
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432

        Map<String, Map<String, String>> redirects = new HashMap<>();
        Pattern redirectPattern = Pattern.compile("^httpReverseProxy\\.([^\\.]*)\\.(.+)$");
        Map<String, List<Matcher>> groups = properties.stringPropertyNames().stream()
            .map((s) -> redirectPattern.matcher(s))
            .filter(Matcher::matches)
            .collect(Collectors.groupingBy((m) -> m.group(1))); // group by name

        groups.entrySet().forEach((e) -> {
                Map<String, String> keyToFullKey = e.getValue().stream().collect(
                        Collectors.toMap(m -> m.group(2), m -> m.group(0)));
                if (!keyToFullKey.containsKey("path")) {
                    throw new IllegalArgumentException(
                            String.format("httpReverseProxy.%s.path must be specified exactly once", e.getKey()));
                }
                if (!keyToFullKey.containsKey("proxyTo")) {
                    throw new IllegalArgumentException(
                            String.format("httpReverseProxy.%s.proxyTo must be specified exactly once", e.getKey()));
                }
                httpReverseProxyConfigs.add(new HttpReverseProxyConfig(e.getKey(),
                                                    properties.getProperty(keyToFullKey.get("path")),
                                                    properties.getProperty(keyToFullKey.get("proxyTo"))));
            });
433
    }
434

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
    public static class HttpReverseProxyConfig {
        private final String name;
        private final String path;
        private final String proxyTo;

        HttpReverseProxyConfig(String name, String path, String proxyTo) {
            this.name = name;
            this.path = path;
            this.proxyTo = proxyTo;
        }

        public String getName() {
            return name;
        }

        public String getPath() {
            return path;
        }

        public String getProxyTo() {
            return proxyTo;
        }

        @Override
        public String toString() {
            return String.format("HttpReverseProxyConfig(%s, path=%s, proxyTo=%s)", name, path, proxyTo);
        }
    }
463
}