提交 ed2d6267 编写于 作者: D dongeforever

Add a basic produce test for static topic

上级 c1d4b8b5
......@@ -767,14 +767,21 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
try {
requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true);
//TODO check if it is leader
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
requestHeader.setQueueId(mappingItem.getQueueId());
long physicalOffset;
//run in local
if (mappingItem.getBname().equals(mappingDetail.getBname())) {
physicalOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(mappingDetail.getTopic(), mappingItem.getQueueId());
} else {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
physicalOffset = offsetResponseHeader.getOffset();
}
GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
long offset = mappingItem.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
long offset = mappingItem.computeStaticQueueOffsetUpToEnd(physicalOffset);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
......
......@@ -725,6 +725,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
......
......@@ -132,6 +132,12 @@ public class RMQNormalProducer extends AbstractMQProducer {
}
}
public void send(int num, MessageQueue mq) {
for (int i = 0; i < num; i++) {
sendMQ((Message) getMessageByTag(null), mq);
}
}
public ResultWrapper sendMQ(Message msg, MessageQueue mq) {
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
try {
......@@ -145,6 +151,9 @@ public class RMQNormalProducer extends AbstractMQProducer {
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
msgBodys.addData(new String(msg.getBody()));
if (originMsgs.getAllData().contains(msg)) {
System.out.println("Hash collision");
}
originMsgs.addData(msg);
originMsgIndex.put(new String(msg.getBody()), metaqResult);
} catch (Exception e) {
......
......@@ -34,6 +34,7 @@ public abstract class AbstractMQProducer extends MQCollector implements MQProduc
protected String producerInstanceName = null;
protected boolean isDebug = false;
public AbstractMQProducer(String topic) {
super();
producerGroupName = RandomUtil.getStringByUUID();
......
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.MQVersion;
......@@ -129,6 +130,7 @@ public class BaseConf {
return mqAdminExt;
}
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
return getProducer(nsAddr, topic, false);
}
......
package org.apache.rocketmq.test.smoke;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.junit.After;
......@@ -17,6 +20,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -61,6 +65,7 @@ public class StaticTopicIT extends BaseConf {
@Test
public void testCreateAndRemappingStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
int queueNum = 10;
Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = createStaticTopic(topic, queueNum, getBrokers());
{
......@@ -77,6 +82,26 @@ public class StaticTopicIT extends BaseConf {
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
}
List<MessageQueue> messageQueueList = producer.getMessageQueue();
Assert.assertEquals(queueNum, messageQueueList.size());
producer.setDebug(true);
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(topic, messageQueue.getTopic());
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(100, messageQueue);
}
//leave the time to build the cq
Thread.sleep(500);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(100, defaultMQAdminExt.maxOffset(messageQueue));
}
Assert.assertEquals(100 * queueNum, producer.getAllOriginMsg().size());
Assert.assertEquals(0, producer.getSendErrorMsg().size());
/*{
Set<String> targetBrokers = Collections.singleton(broker1Name);
Map<String, TopicConfigAndQueueMapping> brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
......
......@@ -215,6 +215,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
//add the existed brokers to target brokers
targetBrokers.addAll(brokerConfigMap.keySet());
//calculate the new data
TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册