提交 30be621e 编写于 作者: M Matteo Merli 提交者: GitHub

Simplified the instantiation of Netty EventLoopGroup (#570)

上级 524ff06e
...@@ -54,7 +54,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; ...@@ -54,7 +54,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.PulsarService;
...@@ -95,6 +94,7 @@ import org.apache.pulsar.common.util.FieldParser; ...@@ -95,6 +94,7 @@ import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener; import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
...@@ -115,12 +115,6 @@ import io.netty.buffer.PooledByteBufAllocator; ...@@ -115,12 +115,6 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies> { public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies> {
...@@ -200,22 +194,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies ...@@ -200,22 +194,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
final int numThreads = Runtime.getRuntime().availableProcessors() * 2; final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
log.info("Using {} threads for broker service IO", numThreads); log.info("Using {} threads for broker service IO", numThreads);
EventLoopGroup acceptorEventLoop, workersEventLoop; this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
if (SystemUtils.IS_OS_LINUX) { this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
try {
acceptorEventLoop = new EpollEventLoopGroup(1, acceptorThreadFactory);
workersEventLoop = new EpollEventLoopGroup(numThreads, workersThreadFactory);
} catch (UnsatisfiedLinkError e) {
acceptorEventLoop = new NioEventLoopGroup(1, acceptorThreadFactory);
workersEventLoop = new NioEventLoopGroup(numThreads, workersThreadFactory);
}
} else {
acceptorEventLoop = new NioEventLoopGroup(1, acceptorThreadFactory);
workersEventLoop = new NioEventLoopGroup(numThreads, workersThreadFactory);
}
this.acceptorGroup = acceptorEventLoop;
this.workerGroup = workersEventLoop;
this.statsUpdater = Executors this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater")); .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
if (pulsar.getConfiguration().isAuthorizationEnabled()) { if (pulsar.getConfiguration().isAuthorizationEnabled()) {
...@@ -286,12 +266,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies ...@@ -286,12 +266,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
if (workerGroup instanceof EpollEventLoopGroup) { bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
bootstrap.channel(EpollServerSocketChannel.class); EventLoopUtil.enableTriggeredMode(bootstrap);
bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
bootstrap.channel(NioServerSocketChannel.class);
}
ServiceConfiguration serviceConfig = pulsar.getConfiguration(); ServiceConfiguration serviceConfig = pulsar.getConfiguration();
......
...@@ -27,7 +27,6 @@ import javax.servlet.http.HttpServletRequest; ...@@ -27,7 +27,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.apache.bookkeeper.test.PortManager; import org.apache.bookkeeper.test.PortManager;
import org.apache.commons.lang.SystemUtils;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandAckHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandAckHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseConsumerHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseConsumerHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseProducerHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseProducerHook;
...@@ -43,11 +42,12 @@ import org.apache.pulsar.common.api.Commands; ...@@ -43,11 +42,12 @@ import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder; import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
...@@ -62,11 +62,7 @@ import io.netty.buffer.ByteBuf; ...@@ -62,11 +62,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/** /**
...@@ -306,28 +302,13 @@ public class MockBrokerService { ...@@ -306,28 +302,13 @@ public class MockBrokerService {
final int numThreads = 2; final int numThreads = 2;
final int MaxMessageSize = 5 * 1024 * 1024; final int MaxMessageSize = 5 * 1024 * 1024;
EventLoopGroup eventLoopGroup;
try { try {
if (SystemUtils.IS_OS_LINUX) { workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, threadFactory);
try {
eventLoopGroup = new EpollEventLoopGroup(numThreads, threadFactory);
} catch (UnsatisfiedLinkError e) {
eventLoopGroup = new NioEventLoopGroup(numThreads, threadFactory);
}
} else {
eventLoopGroup = new NioEventLoopGroup(numThreads, threadFactory);
}
workerGroup = eventLoopGroup;
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(workerGroup, workerGroup); bootstrap.group(workerGroup, workerGroup);
if (workerGroup instanceof EpollEventLoopGroup) { bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
bootstrap.channel(EpollServerSocketChannel.class);
} else {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
......
...@@ -28,11 +28,11 @@ import java.util.concurrent.CompletableFuture; ...@@ -28,11 +28,11 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.PulsarLengthFieldFrameDecoder; import org.apache.pulsar.common.api.PulsarLengthFieldFrameDecoder;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -43,10 +43,7 @@ import io.netty.channel.ChannelFuture; ...@@ -43,10 +43,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
...@@ -68,11 +65,7 @@ public class ConnectionPool implements Closeable { ...@@ -68,11 +65,7 @@ public class ConnectionPool implements Closeable {
pool = new ConcurrentHashMap<>(); pool = new ConcurrentHashMap<>();
bootstrap = new Bootstrap(); bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup); bootstrap.group(eventLoopGroup);
if (SystemUtils.IS_OS_LINUX && eventLoopGroup instanceof EpollEventLoopGroup) { bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
bootstrap.channel(EpollSocketChannel.class);
} else {
bootstrap.channel(NioSocketChannel.class);
}
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
bootstrap.option(ChannelOption.TCP_NODELAY, client.getConfiguration().isUseTcpNoDelay()); bootstrap.option(ChannelOption.TCP_NODELAY, client.getConfiguration().isUseTcpNoDelay());
......
...@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; ...@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.ConsumerConfiguration;
...@@ -43,6 +42,7 @@ import org.apache.pulsar.client.util.ExecutorProvider; ...@@ -43,6 +42,7 @@ import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.FutureUtil; import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -50,8 +50,6 @@ import com.google.common.collect.Lists; ...@@ -50,8 +50,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer; import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
...@@ -468,20 +466,7 @@ public class PulsarClientImpl implements PulsarClient { ...@@ -468,20 +466,7 @@ public class PulsarClientImpl implements PulsarClient {
private static EventLoopGroup getEventLoopGroup(ClientConfiguration conf) { private static EventLoopGroup getEventLoopGroup(ClientConfiguration conf) {
int numThreads = conf.getIoThreads(); int numThreads = conf.getIoThreads();
ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io"); ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io");
return EventLoopUtil.newEventLoopGroup(numThreads, threadFactory);
if (SystemUtils.IS_OS_LINUX) {
try {
return new EpollEventLoopGroup(numThreads, threadFactory);
} catch (ExceptionInInitializerError | NoClassDefFoundError | UnsatisfiedLinkError e) {
if (log.isDebugEnabled()) {
log.debug("Unable to load EpollEventLoop", e);
}
return new NioEventLoopGroup(numThreads, threadFactory);
}
} else {
return new NioEventLoopGroup(numThreads, threadFactory);
}
} }
void cleanupProducer(ProducerBase producer) { void cleanupProducer(ProducerBase producer) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util.netty;
import java.util.concurrent.ThreadFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class EventLoopUtil {
/**
* @return an EventLoopGroup suitable for the current platform
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(nThreads, threadFactory);
} else {
// Fallback to NIO
return new NioEventLoopGroup(nThreads, threadFactory);
}
}
/**
* Return a SocketChannel class suitable for the given EventLoopGroup implementation
*
* @param eventLoopGroup
* @return
*/
public static Class<? extends SocketChannel> getClientSocketChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollSocketChannel.class;
} else {
return NioSocketChannel.class;
}
}
public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollServerSocketChannel.class;
} else {
return NioServerSocketChannel.class;
}
}
public static void enableTriggeredMode(ServerBootstrap bootstrap) {
if (Epoll.isAvailable()) {
bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}
}
...@@ -24,11 +24,11 @@ import java.io.Closeable; ...@@ -24,11 +24,11 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import org.apache.commons.lang.SystemUtils;
import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationManager; import org.apache.pulsar.broker.authorization.AuthorizationManager;
import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.discovery.service.server.ServiceConfig; import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl; import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
...@@ -40,12 +40,6 @@ import io.netty.buffer.PooledByteBufAllocator; ...@@ -40,12 +40,6 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
/** /**
...@@ -74,21 +68,8 @@ public class DiscoveryService implements Closeable { ...@@ -74,21 +68,8 @@ public class DiscoveryService implements Closeable {
this.config = serviceConfig; this.config = serviceConfig;
this.serviceUrl = serviceUrl(); this.serviceUrl = serviceUrl();
this.serviceUrlTls = serviceUrlTls(); this.serviceUrlTls = serviceUrlTls();
EventLoopGroup acceptorEventLoop, workersEventLoop; this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
if (SystemUtils.IS_OS_LINUX) { this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
try {
acceptorEventLoop = new EpollEventLoopGroup(1, acceptorThreadFactory);
workersEventLoop = new EpollEventLoopGroup(numThreads, workersThreadFactory);
} catch (UnsatisfiedLinkError e) {
acceptorEventLoop = new NioEventLoopGroup(1, acceptorThreadFactory);
workersEventLoop = new NioEventLoopGroup(numThreads, workersThreadFactory);
}
} else {
acceptorEventLoop = new NioEventLoopGroup(1, acceptorThreadFactory);
workersEventLoop = new NioEventLoopGroup(numThreads, workersThreadFactory);
}
this.acceptorGroup = acceptorEventLoop;
this.workerGroup = workersEventLoop;
} }
/** /**
...@@ -117,13 +98,8 @@ public class DiscoveryService implements Closeable { ...@@ -117,13 +98,8 @@ public class DiscoveryService implements Closeable {
bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
if (workerGroup instanceof EpollEventLoopGroup) { EventLoopUtil.enableTriggeredMode(bootstrap);
bootstrap.channel(EpollServerSocketChannel.class);
bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new ServiceChannelInitializer(this, config, false)); bootstrap.childHandler(new ServiceChannelInitializer(this, config, false));
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
......
...@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Authentication; ...@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService; import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService; import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
...@@ -45,13 +46,6 @@ import io.netty.buffer.PooledByteBufAllocator; ...@@ -45,13 +46,6 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
/** /**
...@@ -96,13 +90,8 @@ public class ProxyService implements Closeable { ...@@ -96,13 +90,8 @@ public class ProxyService implements Closeable {
this.serviceUrl = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePort()); this.serviceUrl = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePort());
this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls()); this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls());
if (Epoll.isAvailable()) { this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
this.acceptorGroup = new EpollEventLoopGroup(1, acceptorThreadFactory); this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
this.workerGroup = new EpollEventLoopGroup(numThreads, workersThreadFactory);
} else {
this.acceptorGroup = new NioEventLoopGroup(1, acceptorThreadFactory);
this.workerGroup = new NioEventLoopGroup(numThreads, workersThreadFactory);
}
ClientConfiguration clientConfiguration = new ClientConfiguration(); ClientConfiguration clientConfiguration = new ClientConfiguration();
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
...@@ -138,12 +127,8 @@ public class ProxyService implements Closeable { ...@@ -138,12 +127,8 @@ public class ProxyService implements Closeable {
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
if (workerGroup instanceof EpollEventLoopGroup) { bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
bootstrap.channel(EpollServerSocketChannel.class); EventLoopUtil.enableTriggeredMode(bootstrap);
bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, false)); bootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, false));
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
......
...@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean; ...@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.lang.SystemUtils;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.ClientConfiguration;
...@@ -53,9 +52,6 @@ import com.beust.jcommander.Parameter; ...@@ -53,9 +52,6 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException; import com.beust.jcommander.ParameterException;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
/** /**
...@@ -320,14 +316,11 @@ public class LoadSimulationClient { ...@@ -320,14 +316,11 @@ public class LoadSimulationClient {
public LoadSimulationClient(final MainArguments arguments) throws Exception { public LoadSimulationClient(final MainArguments arguments) throws Exception {
payloadCache = new ConcurrentHashMap<>(); payloadCache = new ConcurrentHashMap<>();
topicsToTradeUnits = new ConcurrentHashMap<>(); topicsToTradeUnits = new ConcurrentHashMap<>();
final EventLoopGroup eventLoopGroup = SystemUtils.IS_OS_LINUX
? new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-test-client"))
: new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-test-client"));
clientConf = new ClientConfiguration(); clientConf = new ClientConfiguration();
clientConf.setConnectionsPerBroker(4); clientConf.setConnectionsPerBroker(4);
clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
// Disable stats on the clients to reduce CPU/memory usage. // Disable stats on the clients to reduce CPU/memory usage.
clientConf.setStatsInterval(0, TimeUnit.SECONDS); clientConf.setStatsInterval(0, TimeUnit.SECONDS);
...@@ -345,7 +338,7 @@ public class LoadSimulationClient { ...@@ -345,7 +338,7 @@ public class LoadSimulationClient {
consumerConf = new ConsumerConfiguration(); consumerConf = new ConsumerConfiguration();
consumerConf.setMessageListener(ackListener); consumerConf.setMessageListener(ackListener);
admin = new PulsarAdmin(new URL(arguments.serviceURL), clientConf); admin = new PulsarAdmin(new URL(arguments.serviceURL), clientConf);
client = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup); client = new PulsarClientImpl(arguments.serviceURL, clientConf);
port = arguments.port; port = arguments.port;
executor = Executors.newCachedThreadPool(new DefaultThreadFactory("test-client")); executor = Executors.newCachedThreadPool(new DefaultThreadFactory("test-client"));
} }
......
...@@ -24,12 +24,10 @@ import java.io.FileInputStream; ...@@ -24,12 +24,10 @@ import java.io.FileInputStream;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang.SystemUtils;
import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.ConsumerConfiguration;
...@@ -49,11 +47,6 @@ import com.fasterxml.jackson.databind.ObjectWriter; ...@@ -49,11 +47,6 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
public class PerformanceConsumer { public class PerformanceConsumer {
private static final LongAdder messagesReceived = new LongAdder(); private static final LongAdder messagesReceived = new LongAdder();
private static final LongAdder bytesReceived = new LongAdder(); private static final LongAdder bytesReceived = new LongAdder();
...@@ -176,22 +169,14 @@ public class PerformanceConsumer { ...@@ -176,22 +169,14 @@ public class PerformanceConsumer {
} }
}; };
EventLoopGroup eventLoopGroup;
if (SystemUtils.IS_OS_LINUX) {
eventLoopGroup = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,
new DefaultThreadFactory("pulsar-perf-consumer"));
} else {
eventLoopGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-perf-consumer"));
}
ClientConfiguration clientConf = new ClientConfiguration(); ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setConnectionsPerBroker(arguments.maxConnections); clientConf.setConnectionsPerBroker(arguments.maxConnections);
clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS); clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS);
clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
if (isNotBlank(arguments.authPluginClassName)) { if (isNotBlank(arguments.authPluginClassName)) {
clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams); clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
} }
PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup); PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf);
List<Future<Consumer>> futures = Lists.newArrayList(); List<Future<Consumer>> futures = Lists.newArrayList();
ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); ConsumerConfiguration consumerConfig = new ConsumerConfiguration();
......
...@@ -40,13 +40,12 @@ import java.util.concurrent.atomic.LongAdder; ...@@ -40,13 +40,12 @@ import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.Histogram; import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter; import org.HdrHistogram.HistogramLogWriter;
import org.HdrHistogram.Recorder; import org.HdrHistogram.Recorder;
import org.apache.commons.lang.SystemUtils;
import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -60,9 +59,6 @@ import com.fasterxml.jackson.databind.ObjectWriter; ...@@ -60,9 +59,6 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
public class PerformanceProducer { public class PerformanceProducer {
...@@ -207,23 +203,15 @@ public class PerformanceProducer { ...@@ -207,23 +203,15 @@ public class PerformanceProducer {
String prefixTopicName = arguments.destinations.get(0); String prefixTopicName = arguments.destinations.get(0);
List<Future<Producer>> futures = Lists.newArrayList(); List<Future<Producer>> futures = Lists.newArrayList();
EventLoopGroup eventLoopGroup;
if (SystemUtils.IS_OS_LINUX) {
eventLoopGroup = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-perf-producer"));
} else {
eventLoopGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-perf-producer"));
}
ClientConfiguration clientConf = new ClientConfiguration(); ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setConnectionsPerBroker(arguments.maxConnections); clientConf.setConnectionsPerBroker(arguments.maxConnections);
clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS); clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS);
if (isNotBlank(arguments.authPluginClassName)) { if (isNotBlank(arguments.authPluginClassName)) {
clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams); clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
} }
PulsarClient client = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup); PulsarClient client = new PulsarClientImpl(arguments.serviceURL, clientConf);
ProducerConfiguration producerConf = new ProducerConfiguration(); ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.setSendTimeout(0, TimeUnit.SECONDS); producerConf.setSendTimeout(0, TimeUnit.SECONDS);
......
...@@ -28,8 +28,6 @@ import java.util.concurrent.CompletableFuture; ...@@ -28,8 +28,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.bookie.storage.ldb.ArrayGroupSort;
import org.apache.commons.lang.SystemUtils;
import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
...@@ -51,11 +49,6 @@ import com.fasterxml.jackson.databind.ObjectWriter; ...@@ -51,11 +49,6 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
public class PerformanceReader { public class PerformanceReader {
private static final LongAdder messagesReceived = new LongAdder(); private static final LongAdder messagesReceived = new LongAdder();
private static final LongAdder bytesReceived = new LongAdder(); private static final LongAdder bytesReceived = new LongAdder();
...@@ -172,22 +165,14 @@ public class PerformanceReader { ...@@ -172,22 +165,14 @@ public class PerformanceReader {
} }
}; };
EventLoopGroup eventLoopGroup;
if (SystemUtils.IS_OS_LINUX) {
eventLoopGroup = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,
new DefaultThreadFactory("pulsar-perf-reader"));
} else {
eventLoopGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-perf-reader"));
}
ClientConfiguration clientConf = new ClientConfiguration(); ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setConnectionsPerBroker(arguments.maxConnections); clientConf.setConnectionsPerBroker(arguments.maxConnections);
clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS); clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS);
clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
if (isNotBlank(arguments.authPluginClassName)) { if (isNotBlank(arguments.authPluginClassName)) {
clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams); clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
} }
PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup); PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf);
List<CompletableFuture<Reader>> futures = Lists.newArrayList(); List<CompletableFuture<Reader>> futures = Lists.newArrayList();
ReaderConfiguration readerConfig = new ReaderConfiguration(); ReaderConfiguration readerConfig = new ReaderConfiguration();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册