提交 32f58c79 编写于 作者: D dongeforever

Polish the topic queue mapping context, and process the conext for ConsumerManager

上级 f5285b00
...@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext; ...@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List; import java.util.List;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
...@@ -109,6 +110,11 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -109,6 +110,11 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
final UpdateConsumerOffsetRequestHeader requestHeader = final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request (UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
...@@ -126,6 +132,12 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -126,6 +132,12 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
(QueryConsumerOffsetRequestHeader) request (QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
long offset = long offset =
this.brokerController.getConsumerOffsetManager().queryOffset( this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
...@@ -140,7 +152,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -140,7 +152,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
requestHeader.getQueueId()); requestHeader.getQueueId());
if (minOffset <= 0 if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { requestHeader.getTopic(), requestHeader.getQueueId(), 0)
&& mappingContext.checkIfAsPhysical()) {
responseHeader.setOffset(0L); responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
response.setRemark(null); response.setRemark(null);
......
...@@ -80,6 +80,8 @@ import org.apache.rocketmq.store.PutMessageResult; ...@@ -80,6 +80,8 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
...@@ -101,50 +103,26 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -101,50 +103,26 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
} }
private RemotingCommand buildErrorResponse(int code, String remark) {
final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
response.setCode(code);
response.setRemark(remark);
return response;
}
private TopicQueueMappingContext buildTopicQueueMappingContext(PullMessageRequestHeader requestHeader) {
if (requestHeader.getPhysical() != null
&& Boolean.TRUE.equals(requestHeader.getPhysical())) {
return null;
}
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
//it is not static topic
return null;
}
String topic = requestHeader.getTopic();
Integer globalId = requestHeader.getQueueId();
Long globalOffset = requestHeader.getQueueOffset();
LogicQueueMappingItem mappingItem = mappingDetail.findLogicQueueMappingItem(globalId, globalOffset);
return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem);
}
private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
try { try {
if (mappingContext == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
String topic = mappingContext.getTopic(); String topic = mappingContext.getTopic();
Integer globalId = mappingContext.getGlobalId(); Integer globalId = mappingContext.getGlobalId();
Long globalOffset = mappingContext.getGlobalOffset(); Long globalOffset = mappingContext.getGlobalOffset();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null) { if (mappingItem == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname()));
topic, globalId, this.brokerController.getBrokerConfig().getBrokerName()));
} }
if (globalOffset < mappingItem.getStartOffset()) { if (globalOffset < mappingItem.getStartOffset()) {
log.warn("{}-{} fetch offset {} smaller than the min mapping offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset()); log.warn("{}-{} fetch offset {} smaller than the min mapping offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset());
return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in broker %s", return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in broker %s",
topic, globalId, globalOffset, mappingItem.getStartOffset(), this.brokerController.getBrokerConfig().getBrokerName())); topic, globalId, globalOffset, mappingItem.getStartOffset(), mappingDetail.getBname()));
} }
//below are physical info //below are physical info
String bname = mappingItem.getBname(); String bname = mappingItem.getBname();
...@@ -157,7 +135,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -157,7 +135,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
} }
if (this.brokerController.getBrokerConfig().getBrokerName().equals(bname)) { if (mappingDetail.getBname().equals(bname)) {
//just let it go //just let it go
return null; return null;
} }
...@@ -278,7 +256,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -278,7 +256,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response; return response;
} }
TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, requestHeader.getQueueOffset());
{ {
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
......
...@@ -29,6 +29,7 @@ import org.apache.rocketmq.broker.BrokerController; ...@@ -29,6 +29,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
...@@ -53,6 +54,7 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; ...@@ -53,6 +54,7 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
...@@ -63,6 +65,8 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; ...@@ -63,6 +65,8 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList; private List<ConsumeMessageHook> consumeMessageHookList;
...@@ -99,8 +103,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -99,8 +103,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (requestHeader == null) { if (requestHeader == null) {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true, Long.MAX_VALUE);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return CompletableFuture.completedFuture(rewriteResult); return CompletableFuture.completedFuture(rewriteResult);
} }
...@@ -115,68 +119,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -115,68 +119,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} }
private RemotingCommand buildErrorResponse(int code, String remark) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(code);
response.setRemark(remark);
return response;
}
private TopicQueueMappingContext buildTopicQueueMappingContext(SendMessageRequestHeader requestHeader) {
if (requestHeader.getPhysical() != null
&& Boolean.TRUE.equals(requestHeader.getPhysical())) {
return null;
}
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
//it is not static topic
return null;
}
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, mappingDetail, null);
}
/** /**
* If the response is not null, it meets some errors * If the response is not null, it meets some errors
* @param requestHeader
* @return * @return
*/ */
private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
try {
if (mappingContext == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
Integer phyQueueId = null;
//compatible with the old logic, but it fact, this should not happen
if (requestHeader.getQueueId() < 0) {
Iterator<Map.Entry<Integer, Integer>> it = mappingDetail.getCurrIdMap().entrySet().iterator();
if (it.hasNext()) {
phyQueueId = it.next().getValue();
}
} else {
phyQueueId = mappingDetail.getCurrIdMap().get(requestHeader.getQueueId());
}
if (phyQueueId == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
} else {
requestHeader.setQueueId(phyQueueId);
return null;
}
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
private RemotingCommand rewriteResponseForStaticTopic(SendMessageRequestHeader requestHeader, SendMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) {
private RemotingCommand rewriteResponseForStaticTopic(SendMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) {
try { try {
if (mappingContext == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
long staticLogicOffset = mappingDetail.computeStaticQueueOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset()); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
//no need to care the broker name
long staticLogicOffset = mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset());
if (staticLogicOffset < 0) { if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
responseHeader.setQueueId(mappingContext.getGlobalId()); responseHeader.setQueueId(mappingContext.getGlobalId());
responseHeader.setQueueOffset(staticLogicOffset); responseHeader.setQueueOffset(staticLogicOffset);
...@@ -626,7 +589,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -626,7 +589,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader.setQueueId(queueIdInt); responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); RemotingCommand rewriteResult = rewriteResponseForStaticTopic(responseHeader, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return rewriteResult; return rewriteResult;
} }
......
...@@ -17,21 +17,32 @@ ...@@ -17,21 +17,32 @@
package org.apache.rocketmq.broker.topic; package org.apache.rocketmq.broker.topic;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class TopicQueueMappingManager extends ConfigManager { public class TopicQueueMappingManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000; private static final long LOCK_TIMEOUT_MILLIS = 3000;
...@@ -94,5 +105,75 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -94,5 +105,75 @@ public class TopicQueueMappingManager extends ConfigManager {
return dataVersion; return dataVersion;
} }
public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) {
return buildTopicQueueMappingContext(requestHeader, false, Long.MAX_VALUE);
}
//Do not return a null context
public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss, Long globalOffset) {
if (requestHeader.getPhysical() != null
&& Boolean.TRUE.equals(requestHeader.getPhysical())) {
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null);
}
TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
//it is not static topic
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null);
}
//If not find mappingItem, it encounters some errors
Integer globalId = requestHeader.getQueueId();
if (globalId < 0 && !selectOneWhenMiss) {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null);
}
if (globalId < 0) {
try {
if (!mappingDetail.getHostedQueues().isEmpty()) {
//do not check
globalId = mappingDetail.getHostedQueues().keySet().iterator().next();
}
} catch (Throwable ignored) {
}
}
if (globalId < 0) {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null);
}
ImmutableList<LogicQueueMappingItem> mappingItemList = null;
LogicQueueMappingItem mappingItem = null;
if (globalOffset == null
|| Long.MAX_VALUE == globalOffset) {
mappingItemList = mappingDetail.getMappingInfo(globalId);
if (mappingItemList != null
&& mappingItemList.size() > 0) {
mappingItem = mappingItemList.get(mappingItemList.size() - 1);
}
} else {
mappingItemList = mappingDetail.getMappingInfo(globalId);
mappingItem = TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, globalOffset);
}
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, mappingItemList, mappingItem);
}
public RemotingCommand rewriteRequestForStaticTopic(TopicQueueRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
try {
if (mappingContext.getMappingDetail() == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
}
requestHeader.setQueueId(mappingItem.getQueueId());
return null;
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
} }
...@@ -16,21 +16,31 @@ ...@@ -16,21 +16,31 @@
*/ */
package org.apache.rocketmq.common; package org.apache.rocketmq.common;
import com.google.common.collect.ImmutableList;
public class TopicQueueMappingContext { public class TopicQueueMappingContext {
private String topic; private String topic;
private Integer globalId; private Integer globalId;
private Long globalOffset; private Long globalOffset;
private TopicQueueMappingDetail mappingDetail; private TopicQueueMappingDetail mappingDetail;
private ImmutableList<LogicQueueMappingItem> mappingItemList;
private LogicQueueMappingItem mappingItem; private LogicQueueMappingItem mappingItem;
public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, LogicQueueMappingItem mappingItem) { public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, ImmutableList<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem) {
this.topic = topic; this.topic = topic;
this.globalId = globalId; this.globalId = globalId;
this.globalOffset = globalOffset; this.globalOffset = globalOffset;
this.mappingDetail = mappingDetail; this.mappingDetail = mappingDetail;
this.mappingItemList = mappingItemList;
this.mappingItem = mappingItem; this.mappingItem = mappingItem;
} }
public boolean checkIfAsPhysical() {
return mappingDetail == null
|| mappingItemList == null
|| (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0);
}
public String getTopic() { public String getTopic() {
return topic; return topic;
} }
...@@ -63,6 +73,14 @@ public class TopicQueueMappingContext { ...@@ -63,6 +73,14 @@ public class TopicQueueMappingContext {
this.mappingDetail = mappingDetail; this.mappingDetail = mappingDetail;
} }
public ImmutableList<LogicQueueMappingItem> getMappingItemList() {
return mappingItemList;
}
public void setMappingItemList(ImmutableList<LogicQueueMappingItem> mappingItemList) {
this.mappingItemList = mappingItemList;
}
public LogicQueueMappingItem getMappingItem() { public LogicQueueMappingItem getMappingItem() {
return mappingItem; return mappingItem;
} }
......
...@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; ...@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver // the mapping info in current broker, do not register to nameserver
// make sure this value is not null
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>(); private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname) { public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
...@@ -77,30 +78,14 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -77,30 +78,14 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return tmpIdMap; return tmpIdMap;
} }
public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) { public ImmutableList<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
return hostedQueues.get(globalId); return hostedQueues.get(globalId);
} }
public long computeStaticQueueOffset(Integer globalId, long physicalLogicOffset) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null
|| mappingItems.isEmpty()) {
return -1;
}
if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) {
return mappingItems.get(mappingItems.size() - 1).computeStaticQueueOffset(physicalLogicOffset);
}
//Consider the "switch" process, reduce the error
if (mappingItems.size() >= 2
&& bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) {
return mappingItems.get(mappingItems.size() - 2).computeStaticQueueOffset(physicalLogicOffset);
}
return -1;
}
public LogicQueueMappingItem findLogicQueueMappingItem(Integer globalId, long logicOffset) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId); public static LogicQueueMappingItem findLogicQueueMappingItem(ImmutableList<LogicQueueMappingItem> mappingItems, long logicOffset) {
if (mappingItems == null if (mappingItems == null
|| mappingItems.isEmpty()) { || mappingItems.isEmpty()) {
return null; return null;
...@@ -143,4 +128,11 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -143,4 +128,11 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> getHostedQueues() { public ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> getHostedQueues() {
return hostedQueues; return hostedQueues;
} }
public boolean checkIfAsPhysical(Integer globalId) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
return mappingItems == null
|| (mappingItems.size() == 1
&& mappingItems.get(0).getLogicOffset() == 0);
}
} }
...@@ -20,12 +20,12 @@ ...@@ -20,12 +20,12 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.RequestHeader; import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class PullMessageRequestHeader extends RequestHeader { public class PullMessageRequestHeader extends TopicQueueRequestHeader {
@CFNotNull @CFNotNull
private String consumerGroup; private String consumerGroup;
@CFNotNull @CFNotNull
...@@ -60,18 +60,22 @@ public class PullMessageRequestHeader extends RequestHeader { ...@@ -60,18 +60,22 @@ public class PullMessageRequestHeader extends RequestHeader {
this.consumerGroup = consumerGroup; this.consumerGroup = consumerGroup;
} }
@Override
public String getTopic() { public String getTopic() {
return topic; return topic;
} }
@Override
public void setTopic(String topic) { public void setTopic(String topic) {
this.topic = topic; this.topic = topic;
} }
@Override
public Integer getQueueId() { public Integer getQueueId() {
return queueId; return queueId;
} }
@Override
public void setQueueId(Integer queueId) { public void setQueueId(Integer queueId) {
this.queueId = queueId; this.queueId = queueId;
} }
......
...@@ -20,11 +20,11 @@ ...@@ -20,11 +20,11 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader { public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull @CFNotNull
private String consumerGroup; private String consumerGroup;
@CFNotNull @CFNotNull
...@@ -44,18 +44,22 @@ public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader { ...@@ -44,18 +44,22 @@ public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
this.consumerGroup = consumerGroup; this.consumerGroup = consumerGroup;
} }
@Override
public String getTopic() { public String getTopic() {
return topic; return topic;
} }
@Override
public void setTopic(String topic) { public void setTopic(String topic) {
this.topic = topic; this.topic = topic;
} }
@Override
public Integer getQueueId() { public Integer getQueueId() {
return queueId; return queueId;
} }
@Override
public void setQueueId(Integer queueId) { public void setQueueId(Integer queueId) {
this.queueId = queueId; this.queueId = queueId;
} }
......
...@@ -20,12 +20,12 @@ ...@@ -20,12 +20,12 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.RequestHeader; import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class SendMessageRequestHeader extends RequestHeader { public class SendMessageRequestHeader extends TopicQueueRequestHeader {
@CFNotNull @CFNotNull
private String producerGroup; private String producerGroup;
@CFNotNull @CFNotNull
...@@ -64,10 +64,12 @@ public class SendMessageRequestHeader extends RequestHeader { ...@@ -64,10 +64,12 @@ public class SendMessageRequestHeader extends RequestHeader {
this.producerGroup = producerGroup; this.producerGroup = producerGroup;
} }
@Override
public String getTopic() { public String getTopic() {
return topic; return topic;
} }
@Override
public void setTopic(String topic) { public void setTopic(String topic) {
this.topic = topic; this.topic = topic;
} }
...@@ -88,10 +90,12 @@ public class SendMessageRequestHeader extends RequestHeader { ...@@ -88,10 +90,12 @@ public class SendMessageRequestHeader extends RequestHeader {
this.defaultTopicQueueNums = defaultTopicQueueNums; this.defaultTopicQueueNums = defaultTopicQueueNums;
} }
@Override
public Integer getQueueId() { public Integer getQueueId() {
return queueId; return queueId;
} }
@Override
public void setQueueId(Integer queueId) { public void setQueueId(Integer queueId) {
this.queueId = queueId; this.queueId = queueId;
} }
......
...@@ -20,11 +20,11 @@ ...@@ -20,11 +20,11 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader { public class UpdateConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull @CFNotNull
private String consumerGroup; private String consumerGroup;
@CFNotNull @CFNotNull
...@@ -46,18 +46,22 @@ public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader { ...@@ -46,18 +46,22 @@ public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
this.consumerGroup = consumerGroup; this.consumerGroup = consumerGroup;
} }
@Override
public String getTopic() { public String getTopic() {
return topic; return topic;
} }
@Override
public void setTopic(String topic) { public void setTopic(String topic) {
this.topic = topic; this.topic = topic;
} }
@Override
public Integer getQueueId() { public Integer getQueueId() {
return queueId; return queueId;
} }
@Override
public void setQueueId(Integer queueId) { public void setQueueId(Integer queueId) {
this.queueId = queueId; this.queueId = queueId;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
public abstract class TopicQueueRequestHeader extends RequestHeader {
public abstract String getTopic();
public abstract void setTopic(String topic);
public abstract Integer getQueueId();
public abstract void setQueueId(Integer queueId);
}
...@@ -110,6 +110,13 @@ public class RemotingCommand { ...@@ -110,6 +110,13 @@ public class RemotingCommand {
return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader); return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
} }
public static RemotingCommand buildErrorResponse(int code, String remark) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(code);
response.setRemark(remark);
return response;
}
public static RemotingCommand createResponseCommand(int code, String remark, public static RemotingCommand createResponseCommand(int code, String remark,
Class<? extends CommandCustomHeader> classHeader) { Class<? extends CommandCustomHeader> classHeader) {
RemotingCommand cmd = new RemotingCommand(); RemotingCommand cmd = new RemotingCommand();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册