提交 3706778b 编写于 作者: D duhenglucky

Add lockMQ function for support order message

上级 41d5adc0
...@@ -43,8 +43,6 @@ import org.apache.rocketmq.common.admin.OffsetWrapper; ...@@ -43,8 +43,6 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
...@@ -102,14 +100,16 @@ import org.apache.rocketmq.common.stats.StatsItem; ...@@ -102,14 +100,16 @@ import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot; import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.filter.util.BitsArray; import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable; import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.ConsumeQueueExt;
...@@ -129,7 +129,7 @@ public class AdminBrokerProcessor implements RequestProcessor { ...@@ -129,7 +129,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
@Override @Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel; NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext(); ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
switch (request.getCode()) { switch (request.getCode()) {
case RequestCode.UPDATE_AND_CREATE_TOPIC: case RequestCode.UPDATE_AND_CREATE_TOPIC:
...@@ -250,7 +250,7 @@ public class AdminBrokerProcessor implements RequestProcessor { ...@@ -250,7 +250,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion()); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
return null; return null;
} }
...@@ -440,11 +440,10 @@ public class AdminBrokerProcessor implements RequestProcessor { ...@@ -440,11 +440,10 @@ public class AdminBrokerProcessor implements RequestProcessor {
return response; return response;
} }
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, public RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch( Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
requestBody.getConsumerGroup(), requestBody.getConsumerGroup(),
requestBody.getMqSet(), requestBody.getMqSet(),
...@@ -459,7 +458,7 @@ public class AdminBrokerProcessor implements RequestProcessor { ...@@ -459,7 +458,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
return response; return response;
} }
private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, public RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
......
...@@ -313,38 +313,20 @@ public class SnodeController { ...@@ -313,38 +313,20 @@ public class SnodeController {
} }
public void registerProcessor() { public void registerProcessor() {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.pullMessageExecutor); this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor,
this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE,
defaultMqttMessageProcessor, handleMqttMessageExecutor);
} }
......
...@@ -32,4 +32,6 @@ public class SnodeConstant { ...@@ -32,4 +32,6 @@ public class SnodeConstant {
public static final AttributeKey<ClientRole> NETTY_CLIENT_ROLE_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client.role"); public static final AttributeKey<ClientRole> NETTY_CLIENT_ROLE_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client.role");
public static final AttributeKey<Client> NETTY_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client"); public static final AttributeKey<Client> NETTY_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client");
public static final String ENODE_NAME = "enodeName";
} }
...@@ -74,6 +74,10 @@ public class ConsumerManageProcessor implements RequestProcessor { ...@@ -74,6 +74,10 @@ public class ConsumerManageProcessor implements RequestProcessor {
return getMinOffset(remotingChannel, request); return getMinOffset(remotingChannel, request);
case RequestCode.CREATE_RETRY_TOPIC: case RequestCode.CREATE_RETRY_TOPIC:
return createRetryTopic(remotingChannel, request); return createRetryTopic(remotingChannel, request);
case RequestCode.LOCK_BATCH_MQ:
return lockBatchMQ(remotingChannel, request);
case RequestCode.UNLOCK_BATCH_MQ:
return unlockBatchMQ(remotingChannel, request);
default: default:
break; break;
} }
...@@ -214,5 +218,17 @@ public class ConsumerManageProcessor implements RequestProcessor { ...@@ -214,5 +218,17 @@ public class ConsumerManageProcessor implements RequestProcessor {
requestHeader.getEnodeName(); requestHeader.getEnodeName();
return this.snodeController.getEnodeService().creatRetryTopic(remotingChannel, requestHeader.getEnodeName(), request); return this.snodeController.getEnodeService().creatRetryTopic(remotingChannel, requestHeader.getEnodeName(), request);
} }
public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
return this.snodeController.getEnodeService().lockBatchMQ(remotingChannel, request);
}
public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
return this.snodeController.getEnodeService().unlockBatchMQ(remotingChannel, request);
}
} }
...@@ -121,4 +121,10 @@ public interface EnodeService { ...@@ -121,4 +121,10 @@ public interface EnodeService {
long timestamp, long timestamp,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, RemotingCommandException; RemotingConnectException, RemotingCommandException;
RemotingCommand lockBatchMQ(final RemotingChannel remotingChannel,
final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
RemotingCommand unlockBatchMQ(final RemotingChannel remotingChannel,
final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
} }
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.snode.service.impl; package org.apache.rocketmq.snode.service.impl;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
...@@ -24,6 +25,7 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -24,6 +25,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.service.EnodeService; import org.apache.rocketmq.snode.service.EnodeService;
...@@ -100,14 +102,6 @@ public class LocalEnodeServiceImpl implements EnodeService { ...@@ -100,14 +102,6 @@ public class LocalEnodeServiceImpl implements EnodeService {
public void persistOffset(RemotingChannel remotingChannel, String enodeName, String groupName, String topic, public void persistOffset(RemotingChannel remotingChannel, String enodeName, String groupName, String topic,
int queueId, long offset) { int queueId, long offset) {
try { try {
// UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
// requestHeader.setConsumerGroup(groupName);
// requestHeader.setTopic(topic);
// requestHeader.setQueueId(queueId);
// requestHeader.setCommitOffset(offset);
// RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
// this.brokerController.getConsumerManageProcessor().processRequest(remotingChannel, request);
this.brokerController.getConsumerOffsetManager().commitOffset(remotingChannel.remoteAddress().toString(), groupName, this.brokerController.getConsumerOffsetManager().commitOffset(remotingChannel.remoteAddress().toString(), groupName,
topic, queueId, offset); topic, queueId, offset);
} catch (Exception ex) { } catch (Exception ex) {
...@@ -135,4 +129,17 @@ public class LocalEnodeServiceImpl implements EnodeService { ...@@ -135,4 +129,17 @@ public class LocalEnodeServiceImpl implements EnodeService {
String topic, int queueId, long timestamp, RemotingCommand request) { String topic, int queueId, long timestamp, RemotingCommand request) {
return this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId, timestamp); return this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId, timestamp);
} }
@Override public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel, RemotingCommand request) {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request);
}
@Override public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel, RemotingCommand request) {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
log.info("un");
return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request);
}
} }
...@@ -263,4 +263,24 @@ public class RemoteEnodeServiceImpl implements EnodeService { ...@@ -263,4 +263,24 @@ public class RemoteEnodeServiceImpl implements EnodeService {
return this.snodeController.getRemotingClient().invokeSync(address, return this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} }
@Override
public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return transferToEnode(request);
}
@Override
public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return transferToEnode(request);
}
private RemotingCommand transferToEnode(
final RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
String enodeName = request.getExtFields().get(SnodeConstant.ENODE_NAME);
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
} }
...@@ -101,7 +101,9 @@ public class ScheduledServiceImpl implements ScheduledService { ...@@ -101,7 +101,9 @@ public class ScheduledServiceImpl implements ScheduledService {
@Override @Override
public void run() { public void run() {
try { try {
snodeController.getEnodeService().updateEnodeAddress(snodeConfig.getClusterName()); if (!snodeController.getSnodeConfig().isEmbeddedModeEnable()) {
snodeController.getEnodeService().updateEnodeAddress(snodeConfig.getClusterName());
}
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Update broker addr error:{}", ex); log.warn("Update broker addr error:{}", ex);
} }
...@@ -123,7 +125,9 @@ public class ScheduledServiceImpl implements ScheduledService { ...@@ -123,7 +125,9 @@ public class ScheduledServiceImpl implements ScheduledService {
@Override @Override
public void run() { public void run() {
try { try {
snodeController.getNnodeService().updateEnodeClusterInfo(); if (!snodeController.getSnodeConfig().isEmbeddedModeEnable()) {
snodeController.getNnodeService().updateEnodeClusterInfo();
}
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Update broker addr error:{}", ex); log.warn("Update broker addr error:{}", ex);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册