提交 6f2b1208 编写于 作者: D dongeforever

Finish the basic create,produce,consume of the static topic

上级 ed2d6267
......@@ -16,18 +16,13 @@
*/
package org.apache.rocketmq.broker.processor;
import com.google.common.collect.ImmutableList;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
......@@ -36,18 +31,14 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -56,18 +47,19 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -75,7 +67,6 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
......@@ -83,6 +74,9 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.nio.ByteBuffer;
import java.util.List;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
......@@ -122,10 +116,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname()));
}
if (globalOffset < mappingItem.getStartOffset()) {
log.warn("{}-{} fetch offset {} smaller than the min mapping offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset());
return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in broker %s",
topic, globalId, globalOffset, mappingItem.getStartOffset(), mappingDetail.getBname()));
//TODO Check if the leader? consider the order consumer, which will lock the mq
//
if (globalOffset < mappingItem.getLogicOffset()) {
//handleOffsetMoved
//If the physical queue is reused, we should handle the PULL_OFFSET_MOVED independently
//Otherwise, we could just transfer it to the physical process
}
//below are physical info
String bname = mappingItem.getBname();
......@@ -139,7 +136,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
}
if (mappingDetail.getBname().equals(bname)) {
//just let it go
//just let it go, do the local pull process
return null;
}
......@@ -185,13 +182,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//handle min offset
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
//handle max offset
{
if (mappingItem.checkIfEndOffsetDecided()) {
responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
} else {
responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()));
}
}
responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()),
TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
//set the offsetDelta
responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta());
} catch (Throwable t) {
......@@ -238,18 +230,17 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
String topic = requestHeader.getTopic();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
log.error("the topic {} not exist, consumer: {}", topic, RemotingHelper.parseChannelRemoteAddr(channel));
log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", topic, FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + topic + "] pulling message is forbidden");
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
return response;
}
......@@ -262,10 +253,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
}
}
int queueId = requestHeader.getQueueId();
if (queueId < 0 || queueId >= topicConfig.getReadQueueNums()) {
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
queueId, topic, topicConfig.getReadQueueNums(), channel.remoteAddress());
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
......@@ -277,11 +267,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
if (hasSubscriptionFlag) {
try {
subscriptionData = FilterAPI.build(
topic, requestHeader.getSubscription(), requestHeader.getExpressionType()
requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
);
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
topic, requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
);
assert consumerFilterData != null;
......@@ -310,9 +300,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response;
}
subscriptionData = consumerGroupInfo.findSubscriptionData(topic);
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), topic);
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
......@@ -326,7 +316,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response;
}
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = this.brokerController.getConsumerFilterManager().get(topic,
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
if (consumerFilterData == null) {
response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
......@@ -335,7 +325,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
}
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
requestHeader.getConsumerGroup(), topic, consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
return response;
......@@ -359,72 +349,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getConsumerFilterManager());
}
long offset = requestHeader.getQueueOffset();
int maxMsgNums = requestHeader.getMaxMsgNums();
LogicalQueuesInfoInBroker logicalQueuesInfo = this.brokerController.getTopicConfigManager().selectLogicalQueuesInfo(topic);
LogicalQueueRouteData queueRouteData = null;
if (logicalQueuesInfo != null) {
int responseErrorCode = ResponseCode.SUCCESS;
queueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, offset);
if (queueRouteData != null) {
if (queueRouteData.isWriteOnly()) {
responseErrorCode = ResponseCode.PULL_NOT_FOUND;
response.setRemark("logical queue write only");
} else if (queueRouteData.isExpired()) {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("logical queue expired");
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
} else if (MessageQueueRouteState.ReadOnly.equals(queueRouteData.getState()) && queueRouteData.getOffsetMax() >= 0) {
if (offset >= queueRouteData.getOffsetMax()) {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("queue offset exceed offsetMax");
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
} else if (offset + maxMsgNums > queueRouteData.getOffsetMax()) {
if ((queueRouteData.getOffsetMax() - 1 <= this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId)) &&
(this.brokerController.getMessageStore().getCommitLogOffsetInQueue(topic, queueId, queueRouteData.getOffsetMax() - 1) < this.brokerController.getMessageStore().getMinPhyOffset())) {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("queue offset removed");
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
} else {
maxMsgNums = (int) (queueRouteData.getOffsetMax() - offset);
if (maxMsgNums <= 0) {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("queue offset out of range");
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
}
}
}
}
} else {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("no suitable queue");
response.addExtField(MessageConst.PROPERTY_REDIRECT, "1");
// instruct client to refresh all
response.setBody(null);
queueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, 0L);
}
if (responseErrorCode != ResponseCode.SUCCESS) {
response.setCode(responseErrorCode);
responseHeader.setMinOffset(offset);
responseHeader.setMaxOffset(queueRouteData != null ? queueRouteData.getOffsetMax() : offset);
responseHeader.setNextBeginOffset(queueRouteData != null ? queueRouteData.getOffsetMax() : offset);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
return response;
}
}
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic,
queueId, offset, maxMsgNums, messageFilter);
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
long nextBeginOffset = getMessageResult.getNextBeginOffset();
if (queueRouteData != null && queueRouteData.getOffsetMax() >= 0 && nextBeginOffset > queueRouteData.getOffsetMax()) {
// prevent from pulling messages from next logical queue route data
nextBeginOffset = queueRouteData.getOffsetMax();
}
responseHeader.setNextBeginOffset(nextBeginOffset);
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
// this does not need to be modified since it's not an accurate value under logical queue.
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
......@@ -475,9 +405,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
// XXX: warn and notify me
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
nextBeginOffset,
topic,
queueId,
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
);
} else {
......@@ -502,7 +432,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), topic, requestHeader.getQueueOffset(),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
......@@ -522,8 +452,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
if (this.hasConsumeMessageHook()) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setConsumerGroup(requestHeader.getConsumerGroup());
context.setTopic(topic);
context.setQueueId(queueId);
context.setTopic(requestHeader.getTopic());
context.setQueueId(requestHeader.getQueueId());
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
......@@ -607,6 +537,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
......@@ -614,47 +547,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
break;
}
if (queueRouteData != null) {
logicalQueuesInfo.readLock().lock();
try {
List<LogicalQueueRouteData> queueRouteDataList = logicalQueuesInfo.get(queueRouteData.getLogicalQueueIndex());
MessageQueue latestMessageQueue = queueRouteDataList.get(queueRouteDataList.size() - 1).getMessageQueue();
if (!latestMessageQueue.getBrokerName().equals(brokerController.getBrokerConfig().getBrokerName()) || latestMessageQueue.getQueueId() != queueId) {
// There are other newer message queue, instruct client to refresh meta-data to access these
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
}
} finally {
logicalQueuesInfo.readLock().unlock();
}
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
handleOffsetMoved(requestHeader, responseHeader, response, nextBeginOffset, subscriptionGroupConfig);
break;
default:
assert false;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), topic, queueId, requestHeader.getCommitOffset());
}
return response;
}
public void handleOffsetMoved(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, RemotingCommand response,
long nextBeginOffset,
SubscriptionGroupConfig subscriptionGroupConfig) {
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
MessageQueue mq = new MessageQueue();
......@@ -666,7 +561,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
event.setConsumerGroup(requestHeader.getConsumerGroup());
event.setMessageQueue(mq);
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(nextBeginOffset);
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
......@@ -679,15 +574,25 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}
break;
default:
assert false;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
private void prepareRedirectResponse(RemotingCommand response, LogicalQueuesInfoInBroker logicalQueuesInfo,
LogicalQueueRouteData queueRouteData) {
LogicalQueueRouteData nextReadableLogicalQueueRouteData = logicalQueuesInfo.nextAvailableLogicalRouteData(queueRouteData, LogicalQueueRouteData::isReadable);
if (nextReadableLogicalQueueRouteData != null) {
response.addExtField(MessageConst.PROPERTY_REDIRECT, "1");
response.setBody(RemotingSerializable.encode(ImmutableList.of(queueRouteData, nextReadableLogicalQueueRouteData)));
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
return response;
}
public boolean hasConsumeMessageHook() {
......
......@@ -26,6 +26,7 @@ public class PullResult {
private final long maxOffset;
private List<MessageExt> msgFoundList;
public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
List<MessageExt> msgFoundList) {
super();
......
......@@ -973,7 +973,7 @@ public class MQClientAPIImpl {
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody(), responseHeader.getOffsetDelta());
}
private PopResult processPopResponse(final String brokerName, final RemotingCommand response, String topic,
......
......@@ -107,6 +107,10 @@ public class PullAPIWrapper {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
msg.setBrokerName(mq.getBrokerName());
msg.setQueueId(mq.getQueueId());
if (pullResultExt.getOffsetDelta() != null) {
msg.setQueueOffset(pullResultExt.getOffsetDelta() + msg.getQueueOffset());
}
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
......
......@@ -25,11 +25,23 @@ public class PullResultExt extends PullResult {
private final long suggestWhichBrokerId;
private byte[] messageBinary;
private final Long offsetDelta;
public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
this(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList, suggestWhichBrokerId, messageBinary, 0L);
}
public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary, final Long offsetDelta) {
super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
this.suggestWhichBrokerId = suggestWhichBrokerId;
this.messageBinary = messageBinary;
this.offsetDelta = offsetDelta;
}
public Long getOffsetDelta() {
return offsetDelta;
}
public byte[] getMessageBinary() {
......
......@@ -2,6 +2,9 @@ package org.apache.rocketmq.test.smoke;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.rpc.ClientMetadata;
......@@ -9,8 +12,11 @@ import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.junit.After;
import org.junit.Assert;
......@@ -19,11 +25,15 @@ import org.junit.FixMethodOrder;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.google.common.truth.Truth.assertThat;
import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
@FixMethodOrder
......@@ -63,11 +73,16 @@ public class StaticTopicIT extends BaseConf {
}
@Test
public void testCreateAndRemappingStaticTopic() throws Exception {
public void testCreateProduceConsumeStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = createStaticTopic(topic, queueNum, getBrokers());
//check the static topic config
{
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertEquals(2, remoteBrokerConfigMap.size());
......@@ -82,6 +97,7 @@ public class StaticTopicIT extends BaseConf {
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
}
//check the route data
List<MessageQueue> messageQueueList = producer.getMessageQueue();
Assert.assertEquals(queueNum, messageQueueList.size());
producer.setDebug(true);
......@@ -91,23 +107,45 @@ public class StaticTopicIT extends BaseConf {
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
}
//send and consume the msg
for(MessageQueue messageQueue: messageQueueList) {
producer.send(100, messageQueue);
producer.send(msgEachQueue, messageQueue);
}
//leave the time to build the cq
Thread.sleep(500);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(100, defaultMQAdminExt.maxOffset(messageQueue));
Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
}
Assert.assertEquals(100 * queueNum, producer.getAllOriginMsg().size());
Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size());
Assert.assertEquals(0, producer.getSendErrorMsg().size());
/*{
Set<String> targetBrokers = Collections.singleton(broker1Name);
Map<String, TopicConfigAndQueueMapping> brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMapFromRemote, targetBrokers);
}*/
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
for (Object object : consumer.getListener().getAllOriginMsg()) {
MessageExt messageExt = (MessageExt) object;
if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
}
messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
}
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue, messageExts.size());
Collections.sort(messageExts, new Comparator<MessageExt>() {
@Override
public int compare(MessageExt o1, MessageExt o2) {
return (int) (o1.getQueueOffset() - o2.getQueueOffset());
}
});
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册