提交 cb90409e 编写于 作者: D duhenglucky

Fix connection closed but not clean session issue

上级 81b4293a
......@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
......@@ -193,6 +192,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true);
}
private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true);
......@@ -206,9 +206,9 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
MQBrokerException, InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr){
if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr= this.mQClientFactory.findSnodeAddressInPublish();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
}
if (snodeAddr != null) {
......@@ -233,9 +233,9 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr){
if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr= this.mQClientFactory.findSnodeAddressInPublish();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
}
if (snodeAddr != null) {
......
......@@ -16,12 +16,9 @@
*/
package org.apache.rocketmq.client.exception;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
public class MQSnodeException extends MQBrokerException {
public MQSnodeException(int responseCode, String errorMessage) {
super(responseCode,errorMessage);
super(responseCode, errorMessage);
}
}
......@@ -56,7 +56,6 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -72,8 +71,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
......@@ -1138,6 +1137,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.consumeMessageService = consumeMessageService;
}
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
......
......@@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.client.impl.FindBrokerResult;
......@@ -80,7 +79,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
......@@ -86,7 +86,7 @@ import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
public class SnodeController {
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeConfig snodeConfig;
private final ServerConfig nettyServerConfig;
......@@ -124,12 +124,12 @@ public class SnodeController {
private SlowConsumerService slowConsumerService;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
public SnodeController(ServerConfig nettyServerConfig,
ClientConfig nettyClientConfig,
SnodeConfig snodeConfig) {
ClientConfig nettyClientConfig,
SnodeConfig snodeConfig) {
this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig;
this.snodeConfig = snodeConfig;
......@@ -137,69 +137,69 @@ public class SnodeController {
this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient()
.init(this.getNettyClientConfig(), null);
.init(this.getNettyClientConfig(), null);
this.mqttRemotingClient = RemotingClientFactory.getInstance()
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL)
.init(this.getNettyClientConfig(), null);
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL)
.init(this.getNettyClientConfig(), null);
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread",
false);
this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHeartBeatCorePoolSize(),
snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread",
true);
snodeConfig.getSnodeHeartBeatCorePoolSize(),
snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread",
true);
this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"ConsumerManagerThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"ConsumerManagerThread",
false);
this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()),
"SnodeHandleMqttMessageThread",
false);
snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()),
"SnodeHandleMqttMessageThread",
false);
if (this.snodeConfig.getNamesrvAddr() != null) {
this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}",
this.snodeConfig.getNamesrvAddr());
this.snodeConfig.getNamesrvAddr());
}
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
......@@ -216,7 +216,7 @@ public class SnodeController {
this.consumerManager = new ConsumerManagerImpl(this);
this.iotClientManager = new IOTClientManagerImpl(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager,
this.consumerManager, this.iotClientManager);
this.consumerManager, this.iotClientManager);
this.slowConsumerService = new SlowConsumerServiceImpl(this);
}
......@@ -226,7 +226,7 @@ public class SnodeController {
private void initRemotingServerInterceptorGroup() {
List<Interceptor> remotingServerInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) {
if (this.remotingServerInterceptorGroup == null) {
this.remotingServerInterceptorGroup = new InterceptorGroup();
......@@ -234,17 +234,17 @@ public class SnodeController {
for (Interceptor interceptor : remotingServerInterceptors) {
this.remotingServerInterceptorGroup.registerInterceptor(interceptor);
log.warn("Remoting server interceptor: {} registered!",
interceptor.interceptorName());
interceptor.interceptorName());
}
}
}
public boolean initialize() {
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer()
.init(this.nettyServerConfig, this.clientHousekeepingService);
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
initSnodeInterceptorGroup();
initRemotingServerInterceptorGroup();
......@@ -262,7 +262,7 @@ public class SnodeController {
}
List<AccessValidator> accessValidators = ServiceProvider
.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The snode dose not load the AccessValidator");
return;
......@@ -282,7 +282,7 @@ public class SnodeController {
//Do not catch the exception
RemotingCommand request = requestContext.getRequest();
String remoteAddr = RemotingUtil.socketAddress2IpString(
requestContext.getRemotingChannel().remoteAddress());
requestContext.getRemotingChannel().remoteAddress());
validator.validate(validator.parse(request, remoteAddr));
}
......@@ -300,17 +300,17 @@ public class SnodeController {
private void initSnodeInterceptorGroup() {
List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
.loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) {
this.consumeMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : consumeMessageInterceptors) {
this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
log.warn("Consume message interceptor: {} registered!",
interceptor.interceptorName());
interceptor.interceptorName());
}
}
List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
.loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) {
this.sendMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : sendMessageInterceptors) {
......@@ -323,55 +323,60 @@ public class SnodeController {
public void registerProcessor() {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor,
this.sendMessageExecutor);
this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor,
this.sendMessageExecutor);
this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor,
this.heartbeatExecutor);
this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor,
this.heartbeatExecutor);
this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor,
this.heartbeatExecutor);
this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor,
this.pullMessageExecutor);
this.pullMessageExecutor);
this.snodeServer
.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
this.consumerManageExecutor);
.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor,
.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
this.consumerManageExecutor);
.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE,
defaultMqttMessageProcessor, handleMqttMessageExecutor);
defaultMqttMessageProcessor, handleMqttMessageExecutor);
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT,
new MqttConnectMessageHandler(this));
new MqttConnectMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT,
new MqttDisconnectMessageHandler(this));
new MqttDisconnectMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PINGREQ,
new MqttPingreqMessageHandler(this));
new MqttPingreqMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBLISH,
new MqttPublishMessageHandler(this));
new MqttPublishMessageHandler(this));
defaultMqttMessageProcessor
.registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this));
.registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBCOMP,
new MqttPubcompMessageHandler(this));
new MqttPubcompMessageHandler(this));
defaultMqttMessageProcessor
.registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this));
.registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this));
defaultMqttMessageProcessor
.registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this));
.registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.SUBSCRIBE,
new MqttSubscribeMessageHandler(this));
new MqttSubscribeMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE,
new MqttUnsubscribeMessagHandler(this));
new MqttUnsubscribeMessagHandler(this));
}
public void start() {
......@@ -496,7 +501,7 @@ public class SnodeController {
}
public void setRemotingServerInterceptorGroup(
InterceptorGroup remotingServerInterceptorGroup) {
InterceptorGroup remotingServerInterceptorGroup) {
this.remotingServerInterceptorGroup = remotingServerInterceptorGroup;
}
......
......@@ -25,7 +25,6 @@ import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.snode.client.impl.ClientRole;
import org.apache.rocketmq.snode.constant.SnodeConstant;
public class ClientHousekeepingService implements ChannelEventListener {
......@@ -53,15 +52,13 @@ public class ClientHousekeepingService implements ChannelEventListener {
this.iotClientManager.shutdown();
}
private ClientRole clientRole(RemotingChannel remotingChannel) {
private Client getClient(RemotingChannel remotingChannel) {
if (remotingChannel instanceof NettyChannelImpl) {
Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
if (clientAttribute != null) {
Client client = clientAttribute.get();
if (client != null) {
return client.getClientRole();
}
return client;
}
}
log.warn("RemotingChannel type error: {}", remotingChannel.getClass());
......@@ -69,17 +66,17 @@ public class ClientHousekeepingService implements ChannelEventListener {
}
private void closeChannel(String remoteAddress, RemotingChannel remotingChannel) {
ClientRole clientRole = clientRole(remotingChannel);
if (clientRole != null) {
switch (clientRole) {
Client client = getClient(remotingChannel);
if (client != null) {
switch (client.getClientRole()) {
case Consumer:
this.consumerManager.onClose(remoteAddress, remotingChannel);
this.consumerManager.onClose(client.getGroups(), remotingChannel);
return;
case Producer:
this.producerManager.onClose(remoteAddress, remotingChannel);
this.producerManager.onClose(client.getGroups(), remotingChannel);
return;
case IOTCLIENT:
this.iotClientManager.onClose(remoteAddress, remotingChannel);
this.iotClientManager.onClose(client.getGroups(), remotingChannel);
return;
default:
}
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.snode.client;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.remoting.RemotingChannel;
public interface ClientManager {
......@@ -24,7 +25,7 @@ public interface ClientManager {
void unRegister(String groupId, RemotingChannel remotingChannel);
void onClose(String groupId, RemotingChannel remotingChannel);
void onClose(Set<String> groupId, RemotingChannel remotingChannel);
List<RemotingChannel> getChannels(String groupId);
......
......@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
......@@ -30,20 +31,22 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.snode.client.Client;
import org.apache.rocketmq.snode.client.ClientManager;
public abstract class ClientManagerImpl implements ClientManager {
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
private final ConcurrentHashMap<String/*Producer or Consumer Group*/, ConcurrentHashMap<RemotingChannel, Client>> groupClientTable = new ConcurrentHashMap<>(
1024);
1024);
public abstract void onClosed(String group, RemotingChannel remotingChannel);
......@@ -76,7 +79,7 @@ public abstract class ClientManagerImpl implements ClientManager {
Map.Entry entry = (Map.Entry) iterator.next();
String group = (String) entry.getKey();
ConcurrentHashMap<RemotingChannel, Client> channelTable = (ConcurrentHashMap<RemotingChannel, Client>) entry
.getValue();
.getValue();
Iterator iter = channelTable.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry channelTableEntry = (Map.Entry) iter.next();
......@@ -86,14 +89,14 @@ public abstract class ClientManagerImpl implements ClientManager {
iter.remove();
client.getRemotingChannel().close();
log.warn(
"SCAN: Remove expired channel from {}ClientTable. channel={}, group={}",
client.getClientRole(),
RemotingHelper.parseChannelRemoteAddr(
client.getRemotingChannel().remoteAddress()), group);
"SCAN: Remove expired channel from {}ClientTable. channel={}, group={}",
client.getClientRole(),
RemotingHelper.parseChannelRemoteAddr(
client.getRemotingChannel().remoteAddress()), group);
if (channelTable.isEmpty()) {
iterator.remove();
log.warn("SCAN: Remove group={} channel from {}ClientTable.", group,
client.getClientRole());
client.getClientRole());
}
}
}
......@@ -104,37 +107,41 @@ public abstract class ClientManagerImpl implements ClientManager {
public boolean register(String groupId, Client client) {
boolean updated = false;
if (client != null) {
ConcurrentHashMap<RemotingChannel, Client> channelTable = groupClientTable.get(groupId);
ConcurrentHashMap<RemotingChannel, Client> channelTable = this.groupClientTable.get(groupId);
if (channelTable == null) {
channelTable = new ConcurrentHashMap();
ConcurrentHashMap prev = groupClientTable.putIfAbsent(groupId, channelTable);
channelTable = prev != null ? prev : channelTable;
}
Client oldClient = channelTable.get(client.getRemotingChannel());
log.info("*********");
RemotingChannel remotingChannel = client.getRemotingChannel();
if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel());
}
Client oldClient = channelTable.get(remotingChannel);
if (oldClient == null) {
Client prev = channelTable.put(client.getRemotingChannel(), client);
Client prev = channelTable.put(remotingChannel, client);
if (prev != null) {
log.info("New client connected, group: {} {} {} channel: {}", groupId,
client.toString());
client.toString());
updated = true;
}
oldClient = client;
} else {
if (!oldClient.getClientId().equals(client.getClientId())) {
log.error(
"[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
groupId,
oldClient.toString(),
channelTable.toString());
channelTable.put(client.getRemotingChannel(), client);
"[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
groupId,
oldClient.toString(),
channelTable.toString());
channelTable.put(remotingChannel, client);
}
}
oldClient.setLastUpdateTimestamp(System.currentTimeMillis());
onRegister(groupId, remotingChannel);
}
log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId,
client.getLastUpdateTimestamp());
onRegister(groupId, client.getRemotingChannel());
log.info("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId,
client.getLastUpdateTimestamp());
return updated;
}
......@@ -148,7 +155,7 @@ public abstract class ClientManagerImpl implements ClientManager {
if (channelTable.isEmpty()) {
groupClientTable.remove(groupId);
log.info("Unregister client ok, no any connection, and remove consumer group, {}",
groupId);
groupId);
}
}
}
......@@ -157,12 +164,15 @@ public abstract class ClientManagerImpl implements ClientManager {
public void unRegister(String groupId, RemotingChannel remotingChannel) {
removeClient(groupId, remotingChannel);
onUnregister(groupId, remotingChannel);
}
@Override
public void onClose(String groupId, RemotingChannel remotingChannel) {
removeClient(groupId, remotingChannel);
onClosed(groupId, remotingChannel);
public void onClose(Set<String> groups, RemotingChannel remotingChannel) {
for (String groupId : groups) {
removeClient(groupId, remotingChannel);
onClosed(groupId, remotingChannel);
}
}
public List<RemotingChannel> getChannels(String groupId) {
......@@ -184,7 +194,7 @@ public abstract class ClientManagerImpl implements ClientManager {
Map<RemotingChannel, Client> channelClientMap = this.groupClientTable.get(groupId);
if (channelClientMap != null) {
Iterator<Map.Entry<RemotingChannel, Client>> it = channelClientMap.entrySet()
.iterator();
.iterator();
while (it.hasNext()) {
Map.Entry<RemotingChannel, Client> entry = it.next();
Client client = entry.getValue();
......@@ -201,7 +211,7 @@ public abstract class ClientManagerImpl implements ClientManager {
return null;
}
ConcurrentHashMap<RemotingChannel, Client> channelClientMap = groupClientTable
.get(groupId);
.get(groupId);
return channelClientMap.get(remotingChannel);
}
}
......@@ -130,9 +130,9 @@ public class ConsumerManageProcessor implements RequestProcessor {
if (!clientIds.isEmpty()) {
GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
response.setBody(body.encode());
return response;
} else {
log.warn("GetAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
......@@ -163,11 +163,11 @@ public class ConsumerManageProcessor implements RequestProcessor {
RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
long offset =
this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(),
......
......@@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService {
}
@Override
public void registerSnode(SnodeConfig snodeConfig) throws Exception{
public void registerSnode(SnodeConfig snodeConfig) throws Exception {
List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
RemotingCommand remotingCommand = new RemotingCommand();
RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
......
......@@ -60,7 +60,7 @@ public class NnodeServiceImplTest extends SnodeTestBase {
}
@Test
public void registerSnodeSuccessTest() throws InterruptedException, RemotingConnectException,
public void registerSnodeTest() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
when(snodeController.getRemotingClient().getNameServerAddressList()).thenReturn(createNnodeList());
when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册