diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 024ce67f97f3741af430ae4e255352d20cb1b184..1cbd39c6ffc36a2a992b6e6cd620d72967c13ea2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -246,14 +246,14 @@ public class BrokerController { result = result && this.messageStore.load(); if (result) { - this.remotingServer = RemotingServerFactory.getRemotingServer(); + this.remotingServer = RemotingServerFactory.createInstance(); this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService); // this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); - this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); -// this.fastRemotingServer = RemotingServerFactory.getRemotingServer(); -// this.fastRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService); +// this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); + this.fastRemotingServer = RemotingServerFactory.createInstance(); + this.fastRemotingServer.init(fastConfig, this.clientHousekeepingService); this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index d157021e8dd6a43fbc34dd088cf0aef040806635..9edfcb894d2f264d6909297b9f2a9ed53592f697 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -71,9 +71,7 @@ public class BrokerOuterAPI { } public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) { - this.remotingClient = RemotingClientFactory.getClient(); - this.remotingClient.init(nettyClientConfig, null); -// this.remotingClient = new NettyRemotingClient(nettyClientConfig); + this.remotingClient = RemotingClientFactory.createInstance().init(nettyClientConfig, null); this.remotingClient.registerRPCHook(rpcHook); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java index 22228a6e0ef2cd9231b93a7cba8ca4f1a57cd2ee..1437ffccbda1acaee0d2a2788c4cc9c058cc0a9b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.util; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; +import org.apache.rocketmq.remoting.util.ServiceProvider; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java index 6faccf768ebfbc1ab0372e3b7fa7a1d1f073509d..a329e706a37a50faeb2fd446dfc07c0eff0f2829 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java @@ -77,7 +77,7 @@ public class NamesrvController { this.kvConfigManager.load(); - this.remotingServer = RemotingServerFactory.getRemotingServer(); + this.remotingServer = RemotingServerFactory.createInstance(); this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService); // this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index ab4e914ffb5c18f83b589b959a8c6e32ec4bfeb9..88bca572ee0a591bcd4a8299df7d46cf954f762a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -53,5 +53,5 @@ public interface RemotingClient extends RemotingService { boolean isChannelWritable(final String addr); - void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener); + RemotingClient init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java index 5e87ec97baf275bd6ab6c8eafdc208eb74d2720e..a766625a22f32d0793d513ec9b497afacf69c181 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java @@ -4,7 +4,7 @@ import java.util.Map; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.util.RemotingUtil; +import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.util.ServiceProvider; public class RemotingClientFactory { @@ -13,21 +13,21 @@ public class RemotingClientFactory { private RemotingClientFactory() { } - private static Map clients; + private static Map paths; private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient"; static { log.info("begin load client"); - clients = ServiceProvider.load(CLIENT_LOCATION, RemotingClient.class); - log.info("end load client, size:{}", clients.size()); + paths = ServiceProvider.loadPath(CLIENT_LOCATION); + log.info("end load client, size:{}", paths.size()); } - public static RemotingClient getClient(String protocolType) { - return clients.get(protocolType); + public static RemotingClient createInstance(String protocol) { + return ServiceProvider.createInstance(paths.get(protocol), RemotingClient.class); } - public static RemotingClient getClient() { - return clients.get(RemotingUtil.DEFAULT_PROTOCOL); + public static RemotingClient createInstance() { + return ServiceProvider.createInstance(paths.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingClient.class); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java index 6a5fb91a85ccc4e62ce5174d37caff69e9fd0754..0d5ff3828d10fbb535d59dd5d1a389e601eb3c38 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java @@ -51,6 +51,6 @@ public interface RemotingServer extends RemotingService { throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; - void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener); + RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java index e7a7700f3fff56553189bf76ecd06aacbca9bb07..125d4e05c3900879dab409e3ad9ee1020ee8a5f1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java @@ -4,7 +4,7 @@ import java.util.Map; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.util.RemotingUtil; +import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.util.ServiceProvider; public class RemotingServerFactory { @@ -14,27 +14,21 @@ public class RemotingServerFactory { private RemotingServerFactory() { } - private static Map servers; - -// private static Map + private static Map protocolPathMap; private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer"; static { log.info("begin load server"); - servers = ServiceProvider.load(SERVER_LOCATION, RemotingClient.class); - log.info("end load server, size:{}", servers.size()); + protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION); + log.info("end load server, size:{}", protocolPathMap.size()); } - public static RemotingServer getRemotingServer() { - return getRemotingServer(RemotingUtil.DEFAULT_PROTOCOL); - } - public static RemotingServer getRemotingServer(String protocolType) { - return servers.get(protocolType); + public static RemotingServer createInstance(String protocol) { + return ServiceProvider.createInstance(protocolPathMap.get(protocol), RemotingClient.class); + } + public static RemotingServer createInstance() { + return ServiceProvider.createInstance(protocolPathMap.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingServer.class); } - -// public static RemotingServer createNewInstance(String protocolType){ -// return ServiceProvider.load() -// } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java index 3da3a1839603396a1bd592971059b436a1082e08..88008ab643ad035ffe3646bc0ec284573fb12b6b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java @@ -41,6 +41,8 @@ public class RemotingUtil { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static boolean isLinuxPlatform = false; private static boolean isWindowsPlatform = false; + public static final String DEFAULT_PROTOCOL = "http2"; + public static final String REMOTING_CHARSET = "UTF-8"; static { if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java index b2949589b466caa9e0a6b8dd1ffd43d2655ded14..65504d1307c8aa4a6939caef8fb1d77ffb784a9e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java @@ -64,7 +64,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo } @Override - public void init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) { + public RemotingClient init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) { this.nettyClientConfig = clientConfig; this.channelEventListener = channelEventListener; this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", @@ -75,6 +75,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads())); buildSslContext(); + return this; } private void buildSslContext() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java index 02c4fb6416b8ec9e3a140b925e592f78f4ac7686..6ff3d900e4268e1a0898b5b1f9e211e6cb5678d3 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java @@ -74,7 +74,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo } @Override - public void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) { + public RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) { super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); this.serverConfig = nettyServerConfig; @@ -100,6 +100,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads())); this.port = nettyServerConfig.getListenPort(); buildHttp2SslContext(); + return this; } private void buildHttp2SslContext() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java index 14666d201ebc7bc8d6dab0a01cf28c894826e414..4e691f12224b0823f6b3d356a0c1ad0107cf7a52 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java @@ -86,7 +86,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements } @Override - public void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) { + public RemotingClient init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) { this.nettyClientConfig = nettyClientConfig; this.channelEventListener = channelEventListener; this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", @@ -107,6 +107,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements throw new RuntimeException("Failed to create SSLContext", e); } } + return this; } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java index 9e7d4194cfa52adb0fb773da2c7e515000033780..d167b490bc9c7850ec4b86396abb342aa5f60375 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java @@ -95,7 +95,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements } @Override - public void init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) { + public RemotingServer init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) { this.nettyServerConfig = serverConfig; super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); @@ -126,6 +126,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads())); loadSslContext(); + return this; } public void loadSslContext() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java deleted file mode 100644 index ccd037fecb5dfd6046104338e17e38329e801c70..0000000000000000000000000000000000000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java +++ /dev/null @@ -1,6 +0,0 @@ -package org.apache.rocketmq.remoting.util; - -public class RemotingUtil { - public static final String REMOTING_CHARSET = "UTF-8"; - public static final String DEFAULT_PROTOCOL = "rocketmq"; -} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java index 24fa7213c4f80734494145810620775d499ec3e5..33c8312ed4b676deb3f8ebc5fa42be79faa8a4a9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java @@ -15,6 +15,7 @@ package org.apache.rocketmq.remoting.util; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.logging.InternalLogger; @@ -91,11 +92,10 @@ public class ServiceProvider { } } - public static Map load(String path, Class clazz) { - LOG.info("Looking for a resource file of name [{}] ...", path); - Map services = new ConcurrentHashMap(); + public static Map loadPath(String path) { + LOG.info("Load path looking for a resource file of name [{}] ...", path); + Map pathMap = new HashMap(); try { - final InputStream is = getResourceAsStream(getContextClassLoader(), path); if (is != null) { BufferedReader reader; @@ -106,29 +106,35 @@ public class ServiceProvider { } String serviceName = reader.readLine(); while (serviceName != null && !"".equals(serviceName)) { - LOG.info( - "Creating an instance as specified by file {} which was present in the path of the context classloader.", - path); String[] service = serviceName.split("="); - if (service.length != 2) { - continue; - } else { - if (services.containsKey(service[0])) { + if (service.length == 2) { + if (pathMap.containsKey(service[0])) { continue; } else { - LOG.info("Begin to load protocol: " + service[0]); - services.put(service[0], (T) initService(getContextClassLoader(), service[1], clazz)); + pathMap.put(service[0], service[1]); } + } else { + continue; } serviceName = reader.readLine(); } reader.close(); - } else { - // is == null - LOG.warn("No resource file with name [{}] found.", path); } - } catch (Exception e) { - LOG.error("Error occured when looking for resource file " + path, e); + } catch (Exception ex) { + LOG.error("Error occured when looking for resource file " + path, ex); + } + return pathMap; + } + + public static Map load(String path, Class clazz) { + LOG.info("Load map is looking for a resource file of name [{}] ...", path); + Map services = new HashMap(); + Map pathMaps = loadPath(path); + for (Map.Entry entry : pathMaps.entrySet()) { + T instance = (T) createInstance(entry.getValue(), clazz); + if (instance != null && !services.containsKey(entry.getKey())) { + services.put(entry.getKey(), instance); + } } return services; } @@ -145,12 +151,7 @@ public class ServiceProvider { } String serviceName = reader.readLine(); reader.close(); - if (serviceName != null && !"".equals(serviceName)) { - return initService(getContextClassLoader(), serviceName, clazz); - } else { - LOG.warn("ServiceName is empty!"); - return null; - } + return createInstance(serviceName, clazz); } catch (Exception e) { LOG.warn("Error occurred when looking for resource file " + name, e); } @@ -158,6 +159,15 @@ public class ServiceProvider { return null; } + public static T createInstance(String serviceName, Class clazz) { + if (serviceName != null && !"".equals(serviceName)) { + return initService(getContextClassLoader(), serviceName, clazz); + } else { + LOG.warn("ServiceName is empty!"); + return null; + } + } + protected static T initService(ClassLoader classLoader, String serviceName, Class clazz) { Class serviceClazz = null; try {