提交 0569689b 编写于 作者: R Rajan 提交者: GitHub

Pass brokerServiceUrl to websocket service configuration (#166)

上级 8818eeaa
......@@ -19,6 +19,10 @@
# Global Zookeeper quorum connection string
globalZookeeperServers=
// Pulsar cluster url to connect to broker (optional if globalZookeeperServers present)
serviceUrl=
serviceUrlTls=
# Port to use to server HTTP request
webServicePort=8080
......@@ -39,6 +43,10 @@ authenticationProviders=
# Enforce authorization
authorizationEnabled=false
# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=
# Authentication settings of the proxy itself. Used to connect to brokers
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
......@@ -39,6 +39,7 @@ import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;
public class ProxyAuthenticationTest extends ProducerConsumerBase {
......@@ -56,7 +57,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
super.internalSetup();
super.producerBaseSetup();
ServiceConfiguration config = new ServiceConfiguration();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
config.setAuthenticationEnabled(true);
......
......@@ -29,7 +29,6 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.broker.authorization.AuthorizationManager;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
......@@ -38,6 +37,7 @@ import com.yahoo.pulsar.common.policies.data.AuthAction;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import com.yahoo.pulsar.common.policies.data.PropertyAdmin;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
private WebSocketService service;
......@@ -53,8 +53,10 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
conf.setClusterName("c1");
internalSetup();
ServiceConfiguration config = new ServiceConfiguration();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
Set<String> superUser = Sets.newHashSet("");
config.setAuthorizationEnabled(true);
config.setGlobalZookeeperServers("dummy-zk-servers");
config.setSuperUserRoles(superUser);
config.setClusterName("c1");
config.setWebServicePort(TEST_PORT);
......
......@@ -32,10 +32,10 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;
public class ProxyPublishConsumeTest extends ProducerConsumerBase {
......@@ -51,9 +51,10 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
super.internalSetup();
super.producerBaseSetup();
ServiceConfiguration config = new ServiceConfiguration();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
config.setGlobalZookeeperServers("dummy-zk-servers");
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
proxyServer = new ProxyServer(config);
......
......@@ -40,10 +40,10 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
......@@ -64,13 +64,14 @@ public class ProxyPublishConsumeTls extends ProducerConsumerBase {
super.internalSetup();
super.producerBaseSetup();
ServiceConfiguration config = new ServiceConfiguration();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(TEST_PORT);
config.setWebServicePortTls(TLS_TEST_PORT);
config.setTlsEnabled(true);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setClusterName("use");
config.setGlobalZookeeperServers("dummy-zk-servers");
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
proxyServer = new ProxyServer(config);
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.websocket.proxy;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;
public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
protected String methodName;
private static final String CONSUME_URI = "ws://localhost:6080/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "ws://localhost:6080/ws/producer/persistent/my-property/use/my-ns/my-topic/";
private static final int TEST_PORT = 6080;
private ProxyServer proxyServer;
private WebSocketService service;
@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
config.setServiceUrl(pulsar.getWebServiceAddress());
config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
}
@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
service.close();
proxyServer.stop();
log.info("Finished Cleaning Up Test setup");
}
@Test
public void socketTest() throws Exception {
URI consumeUri = URI.create(CONSUME_URI);
URI produceUri = URI.create(PRODUCE_URI);
WebSocketClient consumeClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Thread.sleep(1000);
Assert.assertTrue(consumerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());
consumeSocket.awaitClose(1, TimeUnit.SECONDS);
produceSocket.awaitClose(1, TimeUnit.SECONDS);
Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} finally {
try {
consumeClient.stop();
produceClient.stop();
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeWithoutZKTest.class);
}
......@@ -26,6 +26,7 @@ import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -39,6 +40,7 @@ import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.zookeeper.GlobalZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
......@@ -68,13 +70,11 @@ public class WebSocketService implements Closeable {
private ClusterData localCluster;
public WebSocketService(ServiceConfiguration config) throws PulsarClientException, MalformedURLException,
ServletException, DeploymentException, PulsarServerException {
this(null, config);
public WebSocketService(WebSocketProxyConfiguration config) {
this(createClusterData(config), createServiceConfiguration(config));
}
public WebSocketService(ClusterData localCluster, ServiceConfiguration config) throws PulsarClientException,
MalformedURLException, ServletException, DeploymentException, PulsarServerException {
public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
this.config = config;
this.localCluster = localCluster;
}
......@@ -82,20 +82,29 @@ public class WebSocketService implements Closeable {
public void start() throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
DeploymentException {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
this.orderedExecutor, this.executor);
try {
this.globalZkCache.start();
} catch (IOException e) {
throw new PulsarServerException(e);
if (isNotBlank(config.getGlobalZookeeperServers())) {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
this.orderedExecutor, this.executor);
try {
this.globalZkCache.start();
} catch (IOException e) {
throw new PulsarServerException(e);
}
this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache());
log.info("Global Zookeeper cache started");
}
this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache());
log.info("Global Zookeeper cache started");
// start authorizationManager
if (config.isAuthorizationEnabled()) {
if (configurationCacheService == null) {
throw new PulsarServerException(
"Failed to initialize authorization manager due to empty GlobalZookeeperServers");
}
authorizationManager = new AuthorizationManager(this.config, configurationCacheService);
}
// start authentication service
authenticationService = new AuthenticationService(this.config);
authorizationManager = new AuthorizationManager(this.config, configurationCacheService);
log.info("Pulsar WebSocket Service started");
}
......@@ -163,7 +172,37 @@ public class WebSocketService implements Closeable {
}
}
private static ClusterData createClusterData(WebSocketProxyConfiguration config) {
if (isNotBlank(config.getServiceUrl()) || isNotBlank(config.getServiceUrlTls())) {
return new ClusterData(config.getServiceUrl(), config.getServiceUrlTls());
} else {
return null;
}
}
private static ServiceConfiguration createServiceConfiguration(WebSocketProxyConfiguration config) {
ServiceConfiguration serviceConfig = new ServiceConfiguration();
serviceConfig.setClusterName(config.getClusterName());
serviceConfig.setWebServicePort(config.getWebServicePort());
serviceConfig.setWebServicePortTls(config.getWebServicePortTls());
serviceConfig.setAuthenticationEnabled(config.isAuthenticationEnabled());
serviceConfig.setAuthenticationProviders(config.getAuthenticationProviders());
serviceConfig.setBrokerClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin());
serviceConfig.setBrokerClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters());
serviceConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
serviceConfig.setSuperUserRoles(config.getSuperUserRoles());
serviceConfig.setGlobalZookeeperServers(config.getGlobalZookeeperServers());
serviceConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
serviceConfig.setTlsEnabled(config.isTlsEnabled());
serviceConfig.setTlsCertificateFilePath(config.getTlsCertificateFilePath());
serviceConfig.setTlsKeyFilePath(config.getTlsKeyFilePath());
return serviceConfig;
}
private ClusterData retrieveClusterData() throws PulsarServerException {
if (configurationCacheService == null) {
throw new PulsarServerException("Failed to retrieve Cluster data due to empty GlobalZookeeperServers");
}
try {
String path = "/admin/clusters/" + config.getClusterName();
return localCluster = configurationCacheService.clustersCache().get(path)
......
......@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.util.SecurityUtility;
......@@ -54,9 +53,9 @@ public class ProxyServer {
private final ExecutorService executorService;
private final Server server;
private final List<Handler> handlers = Lists.newArrayList();
private final ServiceConfiguration conf;
private final WebSocketProxyConfiguration conf;
public ProxyServer(ServiceConfiguration config)
public ProxyServer(WebSocketProxyConfiguration config)
throws PulsarClientException, MalformedURLException, PulsarServerException {
this.conf = config;
this.executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(),
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.websocket.service;
import java.util.Set;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.FieldContext;
public class WebSocketProxyConfiguration {
// Name of the cluster to which this broker belongs to
@FieldContext(required = true)
private String clusterName;
// Pulsar cluster url to connect to broker (optional if globalZookeeperServers present)
private String serviceUrl;
private String serviceUrlTls;
// Global Zookeeper quorum connection string
private String globalZookeeperServers;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
// Port to use to server HTTP request
private int webServicePort = 8080;
// Port to use to server HTTPS request
private int webServicePortTls = 8443;
// Hostname or IP address the service binds on, default is 0.0.0.0.
private String bindAddress;
// --- Authentication ---
// Enable authentication
private boolean authenticationEnabled;
// Autentication provider name list, which is a list of class names
private Set<String> authenticationProviders = Sets.newTreeSet();
// Enforce authorization
private boolean authorizationEnabled;
// Role names that are treated as "super-user", meaning they will be able to
// do all admin operations and publish/consume from all topics
private Set<String> superUserRoles = Sets.newTreeSet();
// Authentication settings of the proxy itself. Used to connect to brokers
private String brokerClientAuthenticationPlugin;
private String brokerClientAuthenticationParameters;
/***** --- TLS --- ****/
// Enable TLS
private boolean tlsEnabled = false;
// Path for the TLS certificate file
private String tlsCertificateFilePath;
// Path for the TLS private key file
private String tlsKeyFilePath;
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public String getServiceUrl() {
return serviceUrl;
}
public void setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
}
public String getServiceUrlTls() {
return serviceUrlTls;
}
public void setServiceUrlTls(String serviceUrlTls) {
this.serviceUrlTls = serviceUrlTls;
}
public String getGlobalZookeeperServers() {
return globalZookeeperServers;
}
public void setGlobalZookeeperServers(String globalZookeeperServers) {
this.globalZookeeperServers = globalZookeeperServers;
}
public long getZooKeeperSessionTimeoutMillis() {
return zooKeeperSessionTimeoutMillis;
}
public void setZooKeeperSessionTimeoutMillis(long zooKeeperSessionTimeoutMillis) {
this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis;
}
public int getWebServicePort() {
return webServicePort;
}
public void setWebServicePort(int webServicePort) {
this.webServicePort = webServicePort;
}
public int getWebServicePortTls() {
return webServicePortTls;
}
public void setWebServicePortTls(int webServicePortTls) {
this.webServicePortTls = webServicePortTls;
}
public String getBindAddress() {
return bindAddress;
}
public void setBindAddress(String bindAddress) {
this.bindAddress = bindAddress;
}
public boolean isAuthenticationEnabled() {
return authenticationEnabled;
}
public void setAuthenticationEnabled(boolean authenticationEnabled) {
this.authenticationEnabled = authenticationEnabled;
}
public void setAuthenticationProviders(Set<String> providersClassNames) {
authenticationProviders = providersClassNames;
}
public Set<String> getAuthenticationProviders() {
return authenticationProviders;
}
public boolean isAuthorizationEnabled() {
return authorizationEnabled;
}
public void setAuthorizationEnabled(boolean authorizationEnabled) {
this.authorizationEnabled = authorizationEnabled;
}
public Set<String> getSuperUserRoles() {
return superUserRoles;
}
public void setSuperUserRoles(Set<String> superUserRoles) {
this.superUserRoles = superUserRoles;
}
public String getBrokerClientAuthenticationPlugin() {
return brokerClientAuthenticationPlugin;
}
public void setBrokerClientAuthenticationPlugin(String brokerClientAuthenticationPlugin) {
this.brokerClientAuthenticationPlugin = brokerClientAuthenticationPlugin;
}
public String getBrokerClientAuthenticationParameters() {
return brokerClientAuthenticationParameters;
}
public void setBrokerClientAuthenticationParameters(String brokerClientAuthenticationParameters) {
this.brokerClientAuthenticationParameters = brokerClientAuthenticationParameters;
}
public boolean isTlsEnabled() {
return tlsEnabled;
}
public void setTlsEnabled(boolean tlsEnabled) {
this.tlsEnabled = tlsEnabled;
}
public String getTlsCertificateFilePath() {
return tlsCertificateFilePath;
}
public void setTlsCertificateFilePath(String tlsCertificateFilePath) {
this.tlsCertificateFilePath = tlsCertificateFilePath;
}
public String getTlsKeyFilePath() {
return tlsKeyFilePath;
}
public void setTlsKeyFilePath(String tlsKeyFilePath) {
this.tlsKeyFilePath = tlsKeyFilePath;
}
}
......@@ -16,12 +16,18 @@
package com.yahoo.pulsar.websocket.service;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.yahoo.pulsar.common.util.FieldParser.update;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.ServiceConfigurationLoader;
import com.yahoo.pulsar.websocket.WebSocketConsumerServlet;
import com.yahoo.pulsar.websocket.WebSocketProducerServlet;
import com.yahoo.pulsar.websocket.WebSocketService;
......@@ -34,7 +40,7 @@ public class WebSocketServiceStarter {
// load config file and start proxy service
String configFile = args[0];
log.info("Loading configuration from {}", configFile);
ServiceConfiguration config = ServiceConfigurationLoader.create(configFile);
WebSocketProxyConfiguration config = load(configFile);
ProxyServer proxyServer = new ProxyServer(config);
WebSocketService service = new WebSocketService(config);
start(proxyServer, service);
......@@ -50,6 +56,23 @@ public class WebSocketServiceStarter {
proxyServer.start();
service.start();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static WebSocketProxyConfiguration load(String configFile) throws IOException, IllegalArgumentException {
final InputStream inStream = new FileInputStream(configFile);
try {
checkNotNull(inStream, "Unbable to read config file " + configFile);
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
Properties properties = new Properties();
properties.load(inStream);
update((Map) properties, config);
return config;
} finally {
if (inStream != null) {
inStream.close();
}
}
}
private static final Logger log = LoggerFactory.getLogger(WebSocketServiceStarter.class);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册