提交 5e04a278 编写于 作者: D dongeforever

Finish the rewrite topic for pull and send process

上级 36a82fa6
......@@ -236,7 +236,7 @@ public class BrokerController {
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig, this);
this.filterServerManager = new FilterServerManager(this);
this.assignmentManager = new AssignmentManager(this);
......
......@@ -23,8 +23,12 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
......@@ -41,6 +45,8 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
......@@ -66,18 +72,20 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class BrokerOuterAPI {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final RemotingClient remotingClient;
private final BrokerController brokerController;
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, final BrokerController brokerController) {
this(nettyClientConfig, null, brokerController);
}
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.remotingClient.registerRPCHook(rpcHook);
this.brokerController = brokerController;
}
public void start() {
......@@ -454,4 +462,20 @@ public class BrokerOuterAPI {
public void forwardRequest(String brokerAddr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
}
public RemotingCommand pullMessage(String brokerName, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(brokerName);
if (addr == null) {
final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), brokerName, this.brokerController.getBrokerConfig().getBrokerName()));
return response;
}
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return response;
}
}
......@@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
......@@ -35,9 +36,15 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.TopicQueueMappingContext;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
......@@ -49,6 +56,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
......@@ -95,6 +104,132 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return false;
}
private RemotingCommand buildErrorResponse(int code, String remark) {
final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
response.setCode(code);
response.setRemark(remark);
return response;
}
private TopicQueueMappingContext buildTopicQueueMappingContext(PullMessageRequestHeader requestHeader) {
if (requestHeader.getPhysical() != null
&& Boolean.TRUE.equals(requestHeader.getPhysical())) {
return null;
}
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
//it is not static topic
return null;
}
String topic = requestHeader.getTopic();
Integer globalId = requestHeader.getQueueId();
Long globalOffset = requestHeader.getQueueOffset();
LogicQueueMappingItem mappingItem = mappingDetail.getLogicQueueMappingItem(globalId, globalOffset);
return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem);
}
private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
try {
if (mappingContext == null) {
return null;
}
String topic = mappingContext.getTopic();
Integer globalId = mappingContext.getGlobalId();
Long globalOffset = mappingContext.getGlobalOffset();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s",
topic, globalId, this.brokerController.getBrokerConfig().getBrokerName()));
}
if (globalOffset < mappingItem.getStartOffset()) {
log.warn("{}-{} fetch offset {} smaller than the min mapping offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset());
return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in broker %s",
topic, globalId, globalOffset, mappingItem.getStartOffset(), this.brokerController.getBrokerConfig().getBrokerName()));
}
//below are physical info
String bname = mappingItem.getBname();
Integer phyQueueId = mappingItem.getQueueId();
Long phyQueueOffset = mappingItem.convertToPhysicalQueueOffset(globalOffset);
requestHeader.setQueueId(phyQueueId);
requestHeader.setQueueOffset(phyQueueOffset);
if (mappingItem.isEndOffsetDecided()
&& requestHeader.getMaxMsgNums() != null) {
requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
}
if (this.brokerController.getBrokerConfig().getBrokerName().equals(bname)) {
//just let it go
return null;
}
requestHeader.setPhysical(true);
RemotingCommand response = this.brokerController.getBrokerOuterAPI().pullMessage(bname, requestHeader, this.brokerController.getBrokerConfig().getForwardTimeout());
switch (response.getCode()) {
case ResponseCode.SYSTEM_ERROR:
return response;
case ResponseCode.SUCCESS:
case ResponseCode.PULL_NOT_FOUND:
case ResponseCode.PULL_RETRY_IMMEDIATELY:
case ResponseCode.PULL_OFFSET_MOVED:
break;
default:
throw new MQBrokerException(response.getCode(), response.getRemark(), mappingItem.getBname());
}
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
{
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
}
return response;
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) {
try {
if (mappingContext == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
//handle nextBeginOffset
{
long nextBeginOffset = responseHeader.getNextBeginOffset();
assert nextBeginOffset >= requestHeader.getQueueOffset();
//the next begin offset should no more than the end offset
if (mappingItem.isEndOffsetDecided()
&& nextBeginOffset >= mappingItem.getEndOffset()) {
nextBeginOffset = mappingItem.getEndOffset();
}
responseHeader.setNextBeginOffset(mappingItem.convertToStaticQueueOffset(nextBeginOffset));
}
//handle min offset
responseHeader.setMinOffset(mappingItem.convertToStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
//handle max offset
{
if (mappingItem.isEndOffsetDecided()) {
responseHeader.setMaxOffset(Math.max(mappingItem.convertToMaxStaticQueueOffset(), mappingDetail.getMaxOffsetFromMapping(mappingContext.getGlobalId())));
} else {
responseHeader.setMaxOffset(mappingItem.convertToStaticQueueOffset(responseHeader.getMaxOffset()));
}
}
//set the offsetDelta
responseHeader.setOffsetDelta(mappingItem.convertOffsetDelta());
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
return null;
}
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
......@@ -147,6 +282,15 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response;
}
TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader);
{
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
}
int queueId = requestHeader.getQueueId();
if (queueId < 0 || queueId >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
......@@ -395,6 +539,15 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
break;
}
//rewrite the response for the
{
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
}
if (this.hasConsumeMessageHook()) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setConsumerGroup(requestHeader.getConsumerGroup());
......@@ -507,31 +660,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
OffsetMovedEvent event = new OffsetMovedEvent();
event.setConsumerGroup(requestHeader.getConsumerGroup());
event.setMessageQueue(mq);
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(nextBeginOffset);
this.generateOffsetMovedEvent(event);
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}
handleOffsetMoved(requestHeader, responseHeader, response, nextBeginOffset, subscriptionGroupConfig);
break;
default:
assert false;
......@@ -552,6 +681,35 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response;
}
public void handleOffsetMoved(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, RemotingCommand response,
long nextBeginOffset,
SubscriptionGroupConfig subscriptionGroupConfig) {
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
OffsetMovedEvent event = new OffsetMovedEvent();
event.setConsumerGroup(requestHeader.getConsumerGroup());
event.setMessageQueue(mq);
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(nextBeginOffset);
this.generateOffsetMovedEvent(event);
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}
}
private void prepareRedirectResponse(RemotingCommand response, LogicalQueuesInfoInBroker logicalQueuesInfo,
LogicalQueueRouteData queueRouteData) {
LogicalQueueRouteData nextReadableLogicalQueueRouteData = logicalQueuesInfo.nextAvailableLogicalRouteData(queueRouteData, LogicalQueueRouteData::isReadable);
......
......@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.TopicQueueMappingContext;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
......@@ -98,38 +99,53 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader);
TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return CompletableFuture.completedFuture(rewriteResult);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader, mappingContext);
} else {
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader, mappingContext);
}
}
}
private RemotingCommand buildErrorResponse(int code, String remark) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(code);
response.setRemark(remark);
return response;
}
private TopicQueueMappingContext buildTopicQueueMappingContext(SendMessageRequestHeader requestHeader) {
if (requestHeader.getPhysical() != null
&& Boolean.TRUE.equals(requestHeader.getPhysical())) {
return null;
}
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
//it is not static topic
return null;
}
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, mappingDetail, null);
}
/**
* If the response is not null, it meets some errors
* @param requestHeader
* @return
*/
private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader) {
private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
try {
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
//it is not static topic
if (mappingContext == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
Integer phyQueueId = null;
//compatible with the old logic, but it fact, this should not happen
if (requestHeader.getQueueId() < 0) {
......@@ -151,22 +167,18 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
}
private RemotingCommand rewriteResponseForStaticTopic(String topic, SendMessageResponseHeader responseHeader) {
private RemotingCommand rewriteResponseForStaticTopic(SendMessageRequestHeader requestHeader, SendMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) {
try {
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
if (mappingDetail == null) {
if (mappingContext == null) {
return null;
}
Integer globalId = mappingDetail.getCurrIdMapRevert().get(responseHeader.getQueueId());
if (globalId == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exist in response process of current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
}
long staticLogicOffset = mappingDetail.convertToLogicOffset(globalId, responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
long staticLogicOffset = mappingDetail.convertToLogicOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
}
responseHeader.setQueueId(globalId);
responseHeader.setQueueId(mappingContext.getGlobalId());
responseHeader.setQueueOffset(staticLogicOffset);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
......@@ -332,7 +344,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
SendMessageRequestHeader requestHeader,
TopicQueueMappingContext mappingContext) {
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
......@@ -392,7 +405,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt, requestHeader, mappingContext);
}
private CompletableFuture<RemotingCommand> handlePutMessageResultFuture(CompletableFuture<PutMessageResult> putMessageResult,
......@@ -402,9 +415,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
SendMessageResponseHeader responseHeader,
SendMessageContext sendMessageContext,
ChannelHandlerContext ctx,
int queueIdInt) {
int queueIdInt,
SendMessageRequestHeader requestHeader,
TopicQueueMappingContext mappingContext) {
return putMessageResult.thenApply((r) ->
handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt)
handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, requestHeader, mappingContext)
);
}
......@@ -456,7 +471,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final SendMessageRequestHeader requestHeader,
final TopicQueueMappingContext mappingContext) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
......@@ -525,14 +541,15 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, requestHeader, mappingContext);
}
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
int queueIdInt) {
int queueIdInt, SendMessageRequestHeader requestHeader,
TopicQueueMappingContext mappingContext) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
......@@ -609,7 +626,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(msg.getTopic(), responseHeader);
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
......@@ -647,7 +664,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
private CompletableFuture<RemotingCommand> asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
SendMessageRequestHeader requestHeader,
TopicQueueMappingContext mappingContext) {
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
......@@ -689,7 +707,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt, requestHeader, mappingContext);
}
......
......@@ -7,6 +7,7 @@ public class LogicQueueMappingItem {
private String bname;
private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset
private long endOffset; // the end of the physical offset
private long timeOfStart = -1; //mutable
public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) {
......@@ -18,8 +19,32 @@ public class LogicQueueMappingItem {
this.timeOfStart = timeOfStart;
}
public long convertToStaticLogicOffset(long physicalLogicOffset) {
return logicOffset + (physicalLogicOffset - startOffset);
public long convertToStaticQueueOffset(long physicalQueueOffset) {
return logicOffset + (physicalQueueOffset - startOffset);
}
public long convertToPhysicalQueueOffset(long staticQueueOffset) {
return (staticQueueOffset - logicOffset) + startOffset;
}
public long convertToMaxStaticQueueOffset() {
if (endOffset >= startOffset) {
return logicOffset + endOffset - startOffset;
} else {
return logicOffset;
}
}
public boolean isShouldDeleted() {
return endOffset == startOffset;
}
public boolean isEndOffsetDecided() {
//if the endOffset == startOffset, then the item should be deleted
return endOffset > startOffset;
}
public long convertOffsetDelta() {
return logicOffset - startOffset;
}
public int getGen() {
......@@ -55,4 +80,13 @@ public class LogicQueueMappingItem {
public long getStartOffset() {
return startOffset;
}
public long getEndOffset() {
return endOffset;
}
public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
public class TopicQueueMappingContext {
private String topic;
private Integer globalId;
private Long globalOffset;
private TopicQueueMappingDetail mappingDetail;
private LogicQueueMappingItem mappingItem;
public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, LogicQueueMappingItem mappingItem) {
this.topic = topic;
this.globalId = globalId;
this.globalOffset = globalOffset;
this.mappingDetail = mappingDetail;
this.mappingItem = mappingItem;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getGlobalId() {
return globalId;
}
public void setGlobalId(Integer globalId) {
this.globalId = globalId;
}
public Long getGlobalOffset() {
return globalOffset;
}
public void setGlobalOffset(Long globalOffset) {
this.globalOffset = globalOffset;
}
public TopicQueueMappingDetail getMappingDetail() {
return mappingDetail;
}
public void setMappingDetail(TopicQueueMappingDetail mappingDetail) {
this.mappingDetail = mappingDetail;
}
public LogicQueueMappingItem getMappingItem() {
return mappingItem;
}
public void setMappingItem(LogicQueueMappingItem mappingItem) {
this.mappingItem = mappingItem;
}
}
......@@ -27,9 +27,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver
ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
transient ConcurrentMap<Integer/*physicalId*/, Integer/*logicId*/> currIdMapRevert = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname);
......@@ -48,7 +45,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1);
this.currIdMapRevert = revert(this.currIdMap);
}
public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer, Integer> original) {
......@@ -103,16 +99,48 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return -1;
}
if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) {
return mappingItems.get(mappingItems.size() - 1).convertToStaticLogicOffset(physicalLogicOffset);
return mappingItems.get(mappingItems.size() - 1).convertToStaticQueueOffset(physicalLogicOffset);
}
//Consider the "switch" process, reduce the error
if (mappingItems.size() >= 2
&& bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) {
return mappingItems.get(mappingItems.size() - 2).convertToStaticLogicOffset(physicalLogicOffset);
return mappingItems.get(mappingItems.size() - 2).convertToStaticQueueOffset(physicalLogicOffset);
}
return -1;
}
public LogicQueueMappingItem getLogicQueueMappingItem(Integer globalId, long logicOffset) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null
|| mappingItems.isEmpty()) {
return null;
}
//Could use bi-search to polish performance
for (int i = mappingItems.size() - 1; i >= 0; i--) {
LogicQueueMappingItem item = mappingItems.get(i);
if (logicOffset >= item.getLogicOffset()) {
return item;
}
}
//if not found, maybe out of range, return the first one
for (int i = 0; i < mappingItems.size(); i++) {
if (!mappingItems.get(i).isShouldDeleted()) {
return mappingItems.get(i);
}
}
return null;
}
public long getMaxOffsetFromMapping(Integer globalId) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null
|| mappingItems.isEmpty()) {
return -1;
}
LogicQueueMappingItem item = mappingItems.get(mappingItems.size() - 1);
return item.convertToMaxStaticQueueOffset();
}
public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
......@@ -122,13 +150,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return topicQueueMappingInfo;
}
public ConcurrentMap<Integer, Integer> getCurrIdMapRevert() {
return currIdMapRevert;
}
public void setCurrIdMapRevert(ConcurrentMap<Integer, Integer> currIdMapRevert) {
this.currIdMapRevert = currIdMapRevert;
}
public int getTotalQueues() {
return totalQueues;
......
......@@ -20,12 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class PullMessageRequestHeader implements CommandCustomHeader {
public class PullMessageRequestHeader extends RequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
......
......@@ -22,6 +22,7 @@ package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class PullMessageResponseHeader implements CommandCustomHeader {
......@@ -33,6 +34,8 @@ public class PullMessageResponseHeader implements CommandCustomHeader {
private Long minOffset;
@CFNotNull
private Long maxOffset;
@CFNullable
private Long offsetDelta;
@Override
public void checkFields() throws RemotingCommandException {
......@@ -69,4 +72,12 @@ public class PullMessageResponseHeader implements CommandCustomHeader {
public void setSuggestWhichBrokerId(Long suggestWhichBrokerId) {
this.suggestWhichBrokerId = suggestWhichBrokerId;
}
public Long getOffsetDelta() {
return offsetDelta;
}
public void setOffsetDelta(Long offsetDelta) {
this.offsetDelta = offsetDelta;
}
}
......@@ -20,12 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class SendMessageRequestHeader implements CommandCustomHeader {
public class SendMessageRequestHeader extends RequestHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
public abstract class RequestHeader implements CommandCustomHeader {
protected Boolean physical;
public Boolean getPhysical() {
return physical;
}
public void setPhysical(Boolean physical) {
this.physical = physical;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册