diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 914bb5b96107a6b6ff5c16eb25cbfcc54a864e18..8492493d3b6ce6be596b498cc78be92825493ddf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -43,8 +43,6 @@ import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; @@ -102,14 +100,16 @@ import org.apache.rocketmq.common.stats.StatsItem; import org.apache.rocketmq.common.stats.StatsSnapshot; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.filter.util.BitsArray; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; -import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.serialize.RemotingSerializable; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueueExt; @@ -129,7 +129,7 @@ public class AdminBrokerProcessor implements RequestProcessor { @Override public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand request) throws RemotingCommandException { - NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel; + NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel; ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext(); switch (request.getCode()) { case RequestCode.UPDATE_AND_CREATE_TOPIC: @@ -250,7 +250,7 @@ public class AdminBrokerProcessor implements RequestProcessor { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion()); + this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); return null; } @@ -440,11 +440,10 @@ public class AdminBrokerProcessor implements RequestProcessor { return response; } - private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + public RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, + RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); - Set lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch( requestBody.getConsumerGroup(), requestBody.getMqSet(), @@ -459,7 +458,7 @@ public class AdminBrokerProcessor implements RequestProcessor { return response; } - private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, + public RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 5ca8a19debd4c964938fe25d3e3124d75cfeb047..6911d9a889cb5fd6144a44e631fa6bd369918cc7 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -313,38 +313,20 @@ public class SnodeController { } public void registerProcessor() { - this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, - this.sendMessageExecutor); - this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, - this.sendMessageExecutor); - this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, - this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, - this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, - this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, - this.pullMessageExecutor); - this.snodeServer - .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, - this.consumerManageExecutor); - this.snodeServer - .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); - this.snodeServer - .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); - this.snodeServer - .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, - this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, - this.consumerManageExecutor); - this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, - defaultMqttMessageProcessor, handleMqttMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor); + this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java index 16deb6d1eae8f07afcf2745c398b69b9ed85bb25..7798c0df78c5348ce65c71b459cfce27d3626e64 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java @@ -32,4 +32,6 @@ public class SnodeConstant { public static final AttributeKey NETTY_CLIENT_ROLE_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client.role"); public static final AttributeKey NETTY_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client"); + + public static final String ENODE_NAME = "enodeName"; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java index 1953de1cb04a0d4f2f59b4d24c2c25141fb4df79..feb1b3d30548c475afc15ee1232c1c735ab54789 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java @@ -74,6 +74,10 @@ public class ConsumerManageProcessor implements RequestProcessor { return getMinOffset(remotingChannel, request); case RequestCode.CREATE_RETRY_TOPIC: return createRetryTopic(remotingChannel, request); + case RequestCode.LOCK_BATCH_MQ: + return lockBatchMQ(remotingChannel, request); + case RequestCode.UNLOCK_BATCH_MQ: + return unlockBatchMQ(remotingChannel, request); default: break; } @@ -214,5 +218,17 @@ public class ConsumerManageProcessor implements RequestProcessor { requestHeader.getEnodeName(); return this.snodeController.getEnodeService().creatRetryTopic(remotingChannel, requestHeader.getEnodeName(), request); } + + public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel, + RemotingCommand request) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException { + return this.snodeController.getEnodeService().lockBatchMQ(remotingChannel, request); + } + + public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel, + RemotingCommand request) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException { + return this.snodeController.getEnodeService().unlockBatchMQ(remotingChannel, request); + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java index e2db16eb7589700d4b7476811d0803b61feda426..cbb9c6086867b9b0994c6fd4066fff13653fe067 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java @@ -121,4 +121,10 @@ public interface EnodeService { long timestamp, RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException; + + RemotingCommand lockBatchMQ(final RemotingChannel remotingChannel, + final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; + + RemotingCommand unlockBatchMQ(final RemotingChannel remotingChannel, + final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java index 57891043ec6454ac1cc9391295bc37341013ebf6..e8007775036c2029d0f3323f6299d0fd015f7d2e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.snode.service.impl; +import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; @@ -24,6 +25,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.netty.CodecHelper; +import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.snode.service.EnodeService; @@ -100,14 +102,6 @@ public class LocalEnodeServiceImpl implements EnodeService { public void persistOffset(RemotingChannel remotingChannel, String enodeName, String groupName, String topic, int queueId, long offset) { try { -// UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); -// requestHeader.setConsumerGroup(groupName); -// requestHeader.setTopic(topic); -// requestHeader.setQueueId(queueId); -// requestHeader.setCommitOffset(offset); -// RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); -// this.brokerController.getConsumerManageProcessor().processRequest(remotingChannel, request); - this.brokerController.getConsumerOffsetManager().commitOffset(remotingChannel.remoteAddress().toString(), groupName, topic, queueId, offset); } catch (Exception ex) { @@ -135,4 +129,17 @@ public class LocalEnodeServiceImpl implements EnodeService { String topic, int queueId, long timestamp, RemotingCommand request) { return this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId, timestamp); } + + @Override public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel, RemotingCommand request) { + NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel; + ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext(); + return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request); + } + + @Override public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel, RemotingCommand request) { + NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel; + ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext(); + log.info("un"); + return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request); + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java index 311406fdb422f8ee91a6f399aee824f1f635274b..e305699f56a61cf669f8b4558d41ebcc684b7272 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java @@ -263,4 +263,24 @@ public class RemoteEnodeServiceImpl implements EnodeService { return this.snodeController.getRemotingClient().invokeSync(address, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } + + @Override + public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel, + RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + return transferToEnode(request); + } + + @Override + public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel, + RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + return transferToEnode(request); + } + + private RemotingCommand transferToEnode( + final RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + String enodeName = request.getExtFields().get(SnodeConstant.ENODE_NAME); + String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); + return this.snodeController.getRemotingClient().invokeSync(address, + request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java index 79e1d2c4186be51133159e41d1efc1e53268af6c..5d10cfabcbb9e2b8d56f6e52fdbd7839288fce90 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java @@ -101,7 +101,9 @@ public class ScheduledServiceImpl implements ScheduledService { @Override public void run() { try { - snodeController.getEnodeService().updateEnodeAddress(snodeConfig.getClusterName()); + if (!snodeController.getSnodeConfig().isEmbeddedModeEnable()) { + snodeController.getEnodeService().updateEnodeAddress(snodeConfig.getClusterName()); + } } catch (Exception ex) { log.warn("Update broker addr error:{}", ex); } @@ -123,7 +125,9 @@ public class ScheduledServiceImpl implements ScheduledService { @Override public void run() { try { - snodeController.getNnodeService().updateEnodeClusterInfo(); + if (!snodeController.getSnodeConfig().isEmbeddedModeEnable()) { + snodeController.getNnodeService().updateEnodeClusterInfo(); + } } catch (Exception ex) { log.warn("Update broker addr error:{}", ex); }