提交 4d685b51 编写于 作者: M Matteo Merli 提交者: GitHub

Use local broker address as serviceURL when running WebSocket component embedded in broker (#82)

Fixes #77
上级 b33b2a19
......@@ -16,9 +16,7 @@
package com.yahoo.pulsar.broker;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
......@@ -58,6 +56,7 @@ import com.yahoo.pulsar.client.util.FutureUtil;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import com.yahoo.pulsar.websocket.WebSocketConsumerServlet;
import com.yahoo.pulsar.websocket.WebSocketProducerServlet;
import com.yahoo.pulsar.websocket.WebSocketService;
......@@ -252,7 +251,9 @@ public class PulsarService implements AutoCloseable {
this.webService.addRestResources("/lookup", "com.yahoo.pulsar.broker.lookup", true);
if (config.isWebSocketServiceEnabled()) {
this.webSocketService = new WebSocketService(config);
// Use local broker address to avoid different IP address when using a VIP for service discovery
this.webSocketService = new WebSocketService(new ClusterData(webServiceAddress, webServiceAddressTls),
config);
this.webSocketService.start();
this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH,
new ServletHolder(new WebSocketProducerServlet(webSocketService)), true);
......
......@@ -65,9 +65,17 @@ public class WebSocketService implements Closeable {
private ServiceConfiguration config;
private ConfigurationCacheService configurationCacheService;
private ClusterData localCluster;
public WebSocketService(ServiceConfiguration config) throws PulsarClientException, MalformedURLException,
ServletException, DeploymentException, PulsarServerException {
this(null, config);
}
public WebSocketService(ClusterData localCluster, ServiceConfiguration config) throws PulsarClientException,
MalformedURLException, ServletException, DeploymentException, PulsarServerException {
this.config = config;
this.localCluster = localCluster;
}
public void start() throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
......@@ -129,28 +137,37 @@ public class WebSocketService implements Closeable {
public synchronized PulsarClient getPulsarClient() throws IOException {
// Do lazy initialization of client
if (pulsarClient == null) {
ClusterData localCluster;
try {
localCluster = configurationCacheService.clustersCache()
.get("/admin/clusters/" + config.getClusterName());
} catch (Exception e) {
throw new PulsarServerException(e);
if (localCluster == null) {
// If not explicitly set, read clusters data from ZK
localCluster = retrieveClusterData();
}
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
if (config.isAuthenticationEnabled()) {
clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
}
pulsarClient = createClientInstance(localCluster);
}
return pulsarClient;
}
if (config.isTlsEnabled() && !localCluster.getServiceUrlTls().isEmpty()) {
pulsarClient = PulsarClient.create(localCluster.getServiceUrlTls(), clientConf);
} else {
pulsarClient = PulsarClient.create(localCluster.getServiceUrl(), clientConf);
}
private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
if (config.isAuthenticationEnabled()) {
clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
}
if (config.isTlsEnabled() && !clusterData.getServiceUrlTls().isEmpty()) {
return PulsarClient.create(clusterData.getServiceUrlTls(), clientConf);
} else {
return PulsarClient.create(clusterData.getServiceUrl(), clientConf);
}
}
private ClusterData retrieveClusterData() throws PulsarServerException {
try {
return configurationCacheService.clustersCache().get("/admin/clusters/" + config.getClusterName());
} catch (Exception e) {
throw new PulsarServerException(e);
}
return this.pulsarClient;
}
public ConfigurationCacheService getConfigurationCache() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册