提交 fad17e1e 编写于 作者: D dongeforever

Finish the logic for double-read-check

上级 ae7b6751
...@@ -709,6 +709,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -709,6 +709,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset); offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
responseHeader.setOffset(offset); responseHeader.setOffset(offset);
......
...@@ -20,8 +20,14 @@ import io.netty.channel.ChannelHandlerContext; ...@@ -20,8 +20,14 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List; import java.util.List;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
...@@ -39,6 +45,8 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; ...@@ -39,6 +45,8 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
...@@ -111,6 +119,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -111,6 +119,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
(UpdateConsumerOffsetRequestHeader) request (UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return rewriteResult; return rewriteResult;
...@@ -122,6 +131,76 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -122,6 +131,76 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
return response; return response;
} }
public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
try {
if (mappingContext.getMappingDetail() == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
}
if (mappingContext.checkIfAsPhysical()) {
//let it go
requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
return null;
}
//double read check
List<LogicQueueMappingItem> itemList = mappingContext.getMappingItemList();
//by default, it is -1
long offset = -1;
//double read, first from leader, then from second leader
for (int i = 1; i <= 2; i++) {
if (itemList.size() - i < 0) {
break;
}
LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i);
if (mappingItem.getBname().equals(mappingDetail.getBname())) {
offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
if (offset >= 0) {
break;
} else {
//not found
continue;
}
} else {
//maybe we need to reconstruct an object
requestHeader.setBname(mappingItem.getBname());
requestHeader.setQueueId(mappingItem.getQueueId());
requestHeader.setPhysical(true);
RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
} else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){
continue;
} else {
//this should not happen
throw new RuntimeException("Unknown response code " + rpcResponse.getCode());
}
}
}
final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, maybe this group consumer boot first");
}
return response;
} catch (Throwable t) {
t.printStackTrace();
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException { throws RemotingCommandException {
final RemotingCommand response = final RemotingCommand response =
...@@ -132,8 +211,9 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -132,8 +211,9 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
(QueryConsumerOffsetRequestHeader) request (QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return rewriteResult; return rewriteResult;
} }
...@@ -152,8 +232,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -152,8 +232,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
requestHeader.getQueueId()); requestHeader.getQueueId());
if (minOffset <= 0 if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0) requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
&& mappingContext.checkIfAsPhysical()) {
responseHeader.setOffset(0L); responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
response.setRemark(null); response.setRemark(null);
......
...@@ -196,9 +196,8 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -196,9 +196,8 @@ public class TopicQueueMappingManager extends ConfigManager {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null);
} }
List<LogicQueueMappingItem> mappingItemList = null; List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
LogicQueueMappingItem leaderItem = null; LogicQueueMappingItem leaderItem = null;
mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
if (mappingItemList != null if (mappingItemList != null
&& mappingItemList.size() > 0) { && mappingItemList.size() > 0) {
leaderItem = mappingItemList.get(mappingItemList.size() - 1); leaderItem = mappingItemList.get(mappingItemList.size() - 1);
......
...@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentMap; ...@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.OffsetNotFoundException;
import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
...@@ -94,7 +96,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -94,7 +96,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
return brokerOffset; return brokerOffset;
} }
// No offset in broker // No offset in broker
catch (MQBrokerException e) { catch (OffsetNotFoundException e) {
return -1; return -1;
} }
//Other exceptions //Other exceptions
...@@ -108,7 +110,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -108,7 +110,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
} }
} }
return -1; return -3;
} }
@Override @Override
......
package org.apache.rocketmq.client.exception;
public class OffsetNotFoundException extends MQBrokerException {
public OffsetNotFoundException() {
}
public OffsetNotFoundException(int responseCode, String errorMessage) {
super(responseCode, errorMessage);
}
public OffsetNotFoundException(int responseCode, String errorMessage, String brokerAddr) {
super(responseCode, errorMessage, brokerAddr);
}
}
...@@ -30,6 +30,7 @@ import org.apache.rocketmq.client.consumer.PullStatus; ...@@ -30,6 +30,7 @@ import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.MQRedirectException; import org.apache.rocketmq.client.exception.MQRedirectException;
import org.apache.rocketmq.client.exception.OffsetNotFoundException;
import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
...@@ -1232,9 +1233,11 @@ public class MQClientAPIImpl { ...@@ -1232,9 +1233,11 @@ public class MQClientAPIImpl {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
QueryConsumerOffsetResponseHeader responseHeader = QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
return responseHeader.getOffset(); return responseHeader.getOffset();
} }
case ResponseCode.PULL_NOT_FOUND:{
throw new OffsetNotFoundException(response.getCode(), response.getRemark(), addr);
}
default: default:
break; break;
} }
......
...@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.UtilAll; ...@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
......
...@@ -9,6 +9,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode; ...@@ -9,6 +9,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
...@@ -75,6 +76,9 @@ public class RpcClientImpl implements RpcClient { ...@@ -75,6 +76,9 @@ public class RpcClientImpl implements RpcClient {
case RequestCode.GET_EARLIEST_MSG_STORETIME: case RequestCode.GET_EARLIEST_MSG_STORETIME:
rpcResponsePromise = handleGetEarliestMsgStoretime(addr, request, timeoutMs); rpcResponsePromise = handleGetEarliestMsgStoretime(addr, request, timeoutMs);
break; break;
case RequestCode.QUERY_CONSUMER_OFFSET:
rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
break;
default: default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
} }
...@@ -176,6 +180,31 @@ public class RpcClientImpl implements RpcClient { ...@@ -176,6 +180,31 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise; return rpcResponsePromise;
} }
public Promise<RpcResponse> handleQueryConsumerOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
break;
}
case ResponseCode.QUERY_NOT_FOUND: {
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
}
default:{
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
}
}
return rpcResponsePromise;
}
public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture(); final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
......
...@@ -28,6 +28,7 @@ import org.junit.Test; ...@@ -28,6 +28,7 @@ import org.junit.Test;
import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess; import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
...@@ -144,30 +145,123 @@ public class StaticTopicIT extends BaseConf { ...@@ -144,30 +145,123 @@ public class StaticTopicIT extends BaseConf {
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue, messageExts.size());
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
}
}
}
private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) {
Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>(); Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
for (Object object : consumer.getListener().getAllOriginMsg()) { for (Object object : msgs) {
MessageExt messageExt = (MessageExt) object; MessageExt messageExt = (MessageExt) object;
if (!messagesByQueue.containsKey(messageExt.getQueueId())) { if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>()); messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
} }
messagesByQueue.get(messageExt.getQueueId()).add(messageExt); messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
} }
Assert.assertEquals(queueNum, messagesByQueue.size()); for (List<MessageExt> msgEachQueue: messagesByQueue.values()) {
for (int i = 0; i < queueNum; i++) { Collections.sort(msgEachQueue, new Comparator<MessageExt>() {
List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue, messageExts.size());
Collections.sort(messageExts, new Comparator<MessageExt>() {
@Override @Override
public int compare(MessageExt o1, MessageExt o2) { public int compare(MessageExt o1, MessageExt o2) {
return (int) (o1.getQueueOffset() - o2.getQueueOffset()); return (int) (o1.getQueueOffset() - o2.getQueueOffset());
} }
}); });
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
}
} }
return messagesByQueue;
} }
@Test
public void testDoubleReadCheck() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
String group = initConsumerGroup();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
//System.out.printf("Group:%s\n", consumer.getConsumerGroup());
//System.out.printf("Topic:%s\n", topic);
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name);
createStaticTopic(topic, queueNum, targetBrokers);
}
//produce the messages
{
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
}
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
producer.shutdown();
consumer.shutdown();
//remapping the static topic
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name);
remappingStaticTopic(topic, targetBrokers);
}
//make the metadata
Thread.sleep(500);
//System.out.printf("Group:%s\n", consumer.getConsumerGroup());
{
producer = getProducer(nsAddr, topic);
//just refresh the metadata
defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
Assert.assertEquals(broker2Name, clientMetadata.getBrokerNameFromMessageQueue(messageQueue));
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
}
//leave the time to build the cq
Thread.sleep(100);
}
{
consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 6000);
//System.out.printf("Consume %d\n", consumer.getListener().getAllMsgBody().size());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue, messageExts.size());
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExts.get(j).getQueueOffset());
}
}
}
}
@Test @Test
public void testRemappingProduceConsumeStaticTopic() throws Exception { public void testRemappingProduceConsumeStaticTopic() throws Exception {
...@@ -175,6 +269,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -175,6 +269,7 @@ public class StaticTopicIT extends BaseConf {
RMQNormalProducer producer = getProducer(nsAddr, topic); RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 10; int queueNum = 10;
int msgEachQueue = 100; int msgEachQueue = 100;
//create static topic //create static topic
...@@ -254,24 +349,11 @@ public class StaticTopicIT extends BaseConf { ...@@ -254,24 +349,11 @@ public class StaticTopicIT extends BaseConf {
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>(); Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
for (Object object : consumer.getListener().getAllOriginMsg()) {
MessageExt messageExt = (MessageExt) object;
if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
}
messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
}
Assert.assertEquals(queueNum, messagesByQueue.size()); Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) { for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i); List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue * 2, messageExts.size()); Assert.assertEquals(msgEachQueue * 2, messageExts.size());
Collections.sort(messageExts, new Comparator<MessageExt>() {
@Override
public int compare(MessageExt o1, MessageExt o2) {
return (int) (o1.getQueueOffset() - o2.getQueueOffset());
}
});
for (int j = 0; j < msgEachQueue; j++) { for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册