提交 bd3fa562 编写于 作者: J jai1 提交者: Matteo Merli

Removing blockIfQueueFull option from WebSocket Proxy Server (#396)

上级 7badf1ab
......@@ -138,10 +138,9 @@ public class ProducerHandler extends AbstractWebSocketHandler {
private ProducerConfiguration getProducerConfiguration() {
ProducerConfiguration conf = new ProducerConfiguration();
if (queryParams.containsKey("blockIfQueueFull")) {
conf.setBlockIfQueueFull(Boolean.parseBoolean(queryParams.get("blockIfQueueFull")));
}
// Set to false to prevent the server thread from being blocked if a lot of messages are pending.
conf.setBlockIfQueueFull(false);
if (queryParams.containsKey("sendTimeoutMillis")) {
conf.setSendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
}
......
......@@ -60,9 +60,10 @@ public class WebSocketService implements Closeable {
AuthorizationManager authorizationManager;
PulsarClient pulsarClient;
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20,
new DefaultThreadFactory("pulsar-websocket"));
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-websocket-ordered");
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS, new DefaultThreadFactory("pulsar-websocket"));
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(
WebSocketProxyConfiguration.GLOBAL_ZK_THREADS, "pulsar-websocket-ordered");
private GlobalZooKeeperCache globalZkCache;
private ZooKeeperClientFactory zkClientFactory;
private ServiceConfiguration config;
......@@ -162,6 +163,7 @@ public class WebSocketService implements Closeable {
clientConf.setUseTls(config.isTlsEnabled());
clientConf.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
clientConf.setTlsTrustCertsFilePath(config.getTlsTrustCertsFilePath());
clientConf.setIoThreads(WebSocketProxyConfiguration.PULSAR_CLIENT_IO_THREADS);
if (config.isAuthenticationEnabled()) {
clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
......@@ -189,7 +191,7 @@ public class WebSocketService implements Closeable {
return null;
}
}
private static ServiceConfiguration createServiceConfiguration(WebSocketProxyConfiguration config) {
ServiceConfiguration serviceConfig = new ServiceConfiguration();
serviceConfig.setClusterName(config.getClusterName());
......
......@@ -50,15 +50,15 @@ import com.yahoo.pulsar.common.util.SecurityUtility;
import io.netty.util.concurrent.DefaultThreadFactory;
public class ProxyServer {
private final ExecutorService executorService;
private final Server server;
private final List<Handler> handlers = Lists.newArrayList();
private final WebSocketProxyConfiguration conf;
private final ExecutorService executorService;
public ProxyServer(WebSocketProxyConfiguration config)
throws PulsarClientException, MalformedURLException, PulsarServerException {
this.conf = config;
this.executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(),
executorService = Executors.newFixedThreadPool(WebSocketProxyConfiguration.PROXY_SERVER_EXECUTOR_THREADS,
new DefaultThreadFactory("pulsar-websocket-web"));
this.server = new Server(new ExecutorThreadPool(executorService));
List<ServerConnector> connectors = new ArrayList<>();
......@@ -129,6 +129,7 @@ public class ProxyServer {
public void stop() throws Exception {
server.stop();
executorService.shutdown();
}
private static final Logger log = LoggerFactory.getLogger(ProxyServer.class);
......
......@@ -24,6 +24,15 @@ import com.yahoo.pulsar.common.configuration.PulsarConfiguration;
public class WebSocketProxyConfiguration implements PulsarConfiguration {
// Number of threads used by Proxy server
public static final int PROXY_SERVER_EXECUTOR_THREADS = 2 * Runtime.getRuntime().availableProcessors();
// Number of threads used by Websocket service
public static final int WEBSOCKET_SERVICE_THREADS = 20;
// Number of threads used by Global ZK
public static final int GLOBAL_ZK_THREADS = 8;
// Number of IO threads in Pulsar Client
public static final int PULSAR_CLIENT_IO_THREADS = Runtime.getRuntime().availableProcessors();
// Name of the cluster to which this broker belongs to
@FieldContext(required = true)
private String clusterName;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册