提交 c668abb8 编写于 作者: D duhengforever

Modify Serviceloader implementation

上级 71696dc1
...@@ -246,14 +246,14 @@ public class BrokerController { ...@@ -246,14 +246,14 @@ public class BrokerController {
result = result && this.messageStore.load(); result = result && this.messageStore.load();
if (result) { if (result) {
this.remotingServer = RemotingServerFactory.getRemotingServer(); this.remotingServer = RemotingServerFactory.createInstance();
this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService); this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); // this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); // this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
// this.fastRemotingServer = RemotingServerFactory.getRemotingServer(); this.fastRemotingServer = RemotingServerFactory.createInstance();
// this.fastRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService); this.fastRemotingServer.init(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(),
......
...@@ -71,9 +71,7 @@ public class BrokerOuterAPI { ...@@ -71,9 +71,7 @@ public class BrokerOuterAPI {
} }
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) { public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
this.remotingClient = RemotingClientFactory.getClient(); this.remotingClient = RemotingClientFactory.createInstance().init(nettyClientConfig, null);
this.remotingClient.init(nettyClientConfig, null);
// this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.remotingClient.registerRPCHook(rpcHook); this.remotingClient.registerRPCHook(rpcHook);
} }
......
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.util; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.util;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService; import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.remoting.util.ServiceProvider;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
......
...@@ -77,7 +77,7 @@ public class NamesrvController { ...@@ -77,7 +77,7 @@ public class NamesrvController {
this.kvConfigManager.load(); this.kvConfigManager.load();
this.remotingServer = RemotingServerFactory.getRemotingServer(); this.remotingServer = RemotingServerFactory.createInstance();
this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
......
...@@ -53,5 +53,5 @@ public interface RemotingClient extends RemotingService { ...@@ -53,5 +53,5 @@ public interface RemotingClient extends RemotingService {
boolean isChannelWritable(final String addr); boolean isChannelWritable(final String addr);
void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener); RemotingClient init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
} }
...@@ -4,7 +4,7 @@ import java.util.Map; ...@@ -4,7 +4,7 @@ import java.util.Map;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.util.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider; import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingClientFactory { public class RemotingClientFactory {
...@@ -13,21 +13,21 @@ public class RemotingClientFactory { ...@@ -13,21 +13,21 @@ public class RemotingClientFactory {
private RemotingClientFactory() { private RemotingClientFactory() {
} }
private static Map<String, RemotingClient> clients; private static Map<String, String> paths;
private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient"; private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient";
static { static {
log.info("begin load client"); log.info("begin load client");
clients = ServiceProvider.load(CLIENT_LOCATION, RemotingClient.class); paths = ServiceProvider.loadPath(CLIENT_LOCATION);
log.info("end load client, size:{}", clients.size()); log.info("end load client, size:{}", paths.size());
} }
public static RemotingClient getClient(String protocolType) { public static RemotingClient createInstance(String protocol) {
return clients.get(protocolType); return ServiceProvider.createInstance(paths.get(protocol), RemotingClient.class);
} }
public static RemotingClient getClient() { public static RemotingClient createInstance() {
return clients.get(RemotingUtil.DEFAULT_PROTOCOL); return ServiceProvider.createInstance(paths.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingClient.class);
} }
} }
...@@ -51,6 +51,6 @@ public interface RemotingServer extends RemotingService { ...@@ -51,6 +51,6 @@ public interface RemotingServer extends RemotingService {
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException; RemotingSendRequestException;
void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener); RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
} }
...@@ -4,7 +4,7 @@ import java.util.Map; ...@@ -4,7 +4,7 @@ import java.util.Map;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.util.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider; import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingServerFactory { public class RemotingServerFactory {
...@@ -14,27 +14,21 @@ public class RemotingServerFactory { ...@@ -14,27 +14,21 @@ public class RemotingServerFactory {
private RemotingServerFactory() { private RemotingServerFactory() {
} }
private static Map<String, RemotingServer> servers; private static Map<String, String> protocolPathMap;
// private static Map<String/*protocolType*/, String/*path*/ >
private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer"; private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer";
static { static {
log.info("begin load server"); log.info("begin load server");
servers = ServiceProvider.load(SERVER_LOCATION, RemotingClient.class); protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION);
log.info("end load server, size:{}", servers.size()); log.info("end load server, size:{}", protocolPathMap.size());
} }
public static RemotingServer getRemotingServer() {
return getRemotingServer(RemotingUtil.DEFAULT_PROTOCOL);
}
public static RemotingServer getRemotingServer(String protocolType) { public static RemotingServer createInstance(String protocol) {
return servers.get(protocolType); return ServiceProvider.createInstance(protocolPathMap.get(protocol), RemotingClient.class);
}
public static RemotingServer createInstance() {
return ServiceProvider.createInstance(protocolPathMap.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingServer.class);
} }
// public static RemotingServer createNewInstance(String protocolType){
// return ServiceProvider.load()
// }
} }
...@@ -41,6 +41,8 @@ public class RemotingUtil { ...@@ -41,6 +41,8 @@ public class RemotingUtil {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static boolean isLinuxPlatform = false; private static boolean isLinuxPlatform = false;
private static boolean isWindowsPlatform = false; private static boolean isWindowsPlatform = false;
public static final String DEFAULT_PROTOCOL = "http2";
public static final String REMOTING_CHARSET = "UTF-8";
static { static {
if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) { if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
......
...@@ -64,7 +64,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -64,7 +64,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
} }
@Override @Override
public void init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) { public RemotingClient init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = clientConfig; this.nettyClientConfig = clientConfig;
this.channelEventListener = channelEventListener; this.channelEventListener = channelEventListener;
this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
...@@ -75,6 +75,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -75,6 +75,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(), this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads())); ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
buildSslContext(); buildSslContext();
return this;
} }
private void buildSslContext() { private void buildSslContext() {
......
...@@ -74,7 +74,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ...@@ -74,7 +74,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
} }
@Override @Override
public void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) { public RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap(); this.serverBootstrap = new ServerBootstrap();
this.serverConfig = nettyServerConfig; this.serverConfig = nettyServerConfig;
...@@ -100,6 +100,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ...@@ -100,6 +100,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads())); ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
this.port = nettyServerConfig.getListenPort(); this.port = nettyServerConfig.getListenPort();
buildHttp2SslContext(); buildHttp2SslContext();
return this;
} }
private void buildHttp2SslContext() { private void buildHttp2SslContext() {
......
...@@ -86,7 +86,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -86,7 +86,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
} }
@Override @Override
public void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) { public RemotingClient init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = nettyClientConfig; this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener; this.channelEventListener = channelEventListener;
this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
...@@ -107,6 +107,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -107,6 +107,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
throw new RuntimeException("Failed to create SSLContext", e); throw new RuntimeException("Failed to create SSLContext", e);
} }
} }
return this;
} }
@Override @Override
......
...@@ -95,7 +95,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements ...@@ -95,7 +95,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
} }
@Override @Override
public void init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) { public RemotingServer init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) {
this.nettyServerConfig = serverConfig; this.nettyServerConfig = serverConfig;
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap(); this.serverBootstrap = new ServerBootstrap();
...@@ -126,6 +126,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements ...@@ -126,6 +126,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(), this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads())); ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
loadSslContext(); loadSslContext();
return this;
} }
public void loadSslContext() { public void loadSslContext() {
......
package org.apache.rocketmq.remoting.util;
public class RemotingUtil {
public static final String REMOTING_CHARSET = "UTF-8";
public static final String DEFAULT_PROTOCOL = "rocketmq";
}
...@@ -15,6 +15,7 @@ package org.apache.rocketmq.remoting.util; ...@@ -15,6 +15,7 @@ package org.apache.rocketmq.remoting.util;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
...@@ -91,11 +92,10 @@ public class ServiceProvider { ...@@ -91,11 +92,10 @@ public class ServiceProvider {
} }
} }
public static <T> Map<String, T> load(String path, Class<?> clazz) { public static Map<String, String> loadPath(String path) {
LOG.info("Looking for a resource file of name [{}] ...", path); LOG.info("Load path looking for a resource file of name [{}] ...", path);
Map<String, T> services = new ConcurrentHashMap<String, T>(); Map<String, String> pathMap = new HashMap<String, String>();
try { try {
final InputStream is = getResourceAsStream(getContextClassLoader(), path); final InputStream is = getResourceAsStream(getContextClassLoader(), path);
if (is != null) { if (is != null) {
BufferedReader reader; BufferedReader reader;
...@@ -106,29 +106,35 @@ public class ServiceProvider { ...@@ -106,29 +106,35 @@ public class ServiceProvider {
} }
String serviceName = reader.readLine(); String serviceName = reader.readLine();
while (serviceName != null && !"".equals(serviceName)) { while (serviceName != null && !"".equals(serviceName)) {
LOG.info(
"Creating an instance as specified by file {} which was present in the path of the context classloader.",
path);
String[] service = serviceName.split("="); String[] service = serviceName.split("=");
if (service.length != 2) { if (service.length == 2) {
continue; if (pathMap.containsKey(service[0])) {
} else {
if (services.containsKey(service[0])) {
continue; continue;
} else { } else {
LOG.info("Begin to load protocol: " + service[0]); pathMap.put(service[0], service[1]);
services.put(service[0], (T) initService(getContextClassLoader(), service[1], clazz));
} }
} else {
continue;
} }
serviceName = reader.readLine(); serviceName = reader.readLine();
} }
reader.close(); reader.close();
} else {
// is == null
LOG.warn("No resource file with name [{}] found.", path);
} }
} catch (Exception e) { } catch (Exception ex) {
LOG.error("Error occured when looking for resource file " + path, e); LOG.error("Error occured when looking for resource file " + path, ex);
}
return pathMap;
}
public static <T> Map<String, T> load(String path, Class<?> clazz) {
LOG.info("Load map is looking for a resource file of name [{}] ...", path);
Map<String, T> services = new HashMap<String, T>();
Map<String, String> pathMaps = loadPath(path);
for (Map.Entry<String, String> entry : pathMaps.entrySet()) {
T instance = (T) createInstance(entry.getValue(), clazz);
if (instance != null && !services.containsKey(entry.getKey())) {
services.put(entry.getKey(), instance);
}
} }
return services; return services;
} }
...@@ -145,12 +151,7 @@ public class ServiceProvider { ...@@ -145,12 +151,7 @@ public class ServiceProvider {
} }
String serviceName = reader.readLine(); String serviceName = reader.readLine();
reader.close(); reader.close();
if (serviceName != null && !"".equals(serviceName)) { return createInstance(serviceName, clazz);
return initService(getContextClassLoader(), serviceName, clazz);
} else {
LOG.warn("ServiceName is empty!");
return null;
}
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Error occurred when looking for resource file " + name, e); LOG.warn("Error occurred when looking for resource file " + name, e);
} }
...@@ -158,6 +159,15 @@ public class ServiceProvider { ...@@ -158,6 +159,15 @@ public class ServiceProvider {
return null; return null;
} }
public static <T> T createInstance(String serviceName, Class<?> clazz) {
if (serviceName != null && !"".equals(serviceName)) {
return initService(getContextClassLoader(), serviceName, clazz);
} else {
LOG.warn("ServiceName is empty!");
return null;
}
}
protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) { protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) {
Class<?> serviceClazz = null; Class<?> serviceClazz = null;
try { try {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册