提交 456691bf 编写于 作者: D dongeforever

Add IT tests for dleger commitlog

上级 5f730028
......@@ -21,7 +21,7 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa
@Override public void handle(long term, MemberState.Role role) {
try {
log.info("Begin handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
......@@ -41,9 +41,9 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa
default:
break;
}
log.info("Finish handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
log.info("Finish handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
} catch (Throwable t) {
log.info("Failed handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), t);
log.info("Failed handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), t);
}
}
}
......@@ -132,8 +132,11 @@ public class DefaultMessageStore implements MessageStore {
this.cleanConsumeQueueService = new CleanConsumeQueueService();
this.storeStatsService = new StoreStatsService();
this.indexService = new IndexService(this);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
this.reputMessageService = new ReputMessageService();
this.scheduleMessageService = new ScheduleMessageService(this);
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.factory;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
......@@ -60,4 +61,11 @@ public class ConsumerFactory {
consumer.start();
return consumer;
}
public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception {
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
defaultMQPullConsumer.setNamesrvAddr(nsAddr);
defaultMQPullConsumer.start();
return defaultMQPullConsumer;
}
}
......@@ -33,7 +33,7 @@ import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.MQRandomUtils;
public class BaseConf {
protected static String nsAddr;
public static String nsAddr;
protected static String broker1Name;
protected static String broker2Name;
protected static String clusterName;
......
......@@ -50,6 +50,11 @@ public class IntegrationTestBase {
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 256;
protected static final int INDEX_NUM = 1000;
private static final AtomicInteger port = new AtomicInteger(50000);
public static synchronized int nextPort() {
return port.addAndGet(5);
}
protected static Random random = new Random();
static {
......@@ -87,7 +92,7 @@ public class IntegrationTestBase {
}
private static String createBaseDir() {
public static String createBaseDir() {
String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
final File file = new File(baseDir);
if (file.exists()) {
......@@ -112,7 +117,7 @@ public class IntegrationTestBase {
logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
namesrvController.start();
} catch (Exception e) {
logger.info("Name Server start failed");
logger.info("Name Server start failed", e);
System.exit(1);
}
NAMESRV_CONTROLLERS.add(namesrvController);
......@@ -123,8 +128,6 @@ public class IntegrationTestBase {
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
......@@ -132,18 +135,25 @@ public class IntegrationTestBase {
brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMapedFileSizeCommitLog(100 * 1024 * 1024);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
return createAndStartBroker(storeConfig, brokerConfig);
}
public static BrokerController createAndStartBroker(MessageStoreConfig storeConfig, BrokerConfig brokerConfig) {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(nextPort());
storeConfig.setHaListenPort(nextPort());
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
} catch (Throwable t) {
logger.error("Broker start failed, will exit", t);
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
......
package org.apache.rocketmq.test.base.dleger;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.factory.ProducerFactory;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.rocketmq.test.base.IntegrationTestBase.nextPort;
import static sun.util.locale.BaseLocale.SEP;
public class ProduceAndConsumeTest {
public BrokerConfig buildBrokerConfig(String cluster, String brokerName) {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerClusterName(cluster);
brokerConfig.setBrokerName(brokerName);
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(BaseConf.nsAddr);
return brokerConfig;
}
public MessageStoreConfig buildStoreConfig(String brokerName, String peers, String selfId) {
MessageStoreConfig storeConfig = new MessageStoreConfig();
String baseDir = IntegrationTestBase.createBaseDir();
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(nextPort());
storeConfig.setMapedFileSizeCommitLog(10 * 1024 * 1024);
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(brokerName);
storeConfig.setdLegerSelfId(selfId);
storeConfig.setdLegerPeers(peers);
return storeConfig;
}
@Test
public void testProduceAndConsume() throws Exception {
String cluster = UUID.randomUUID().toString();
String brokerName = UUID.randomUUID().toString();
String selfId = "n0";
String peers = String.format("n0-localhost:%d", nextPort());
BrokerConfig brokerConfig = buildBrokerConfig(cluster, brokerName);
MessageStoreConfig storeConfig = buildStoreConfig(brokerName, peers, selfId);
BrokerController brokerController = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
Thread.sleep(1000);
Assert.assertEquals(BrokerRole.SYNC_MASTER, storeConfig.getBrokerRole());
String topic = UUID.randomUUID().toString();
String consumerGroup = UUID.randomUUID().toString();
IntegrationTestBase.initTopic(topic, BaseConf.nsAddr, cluster, 1);
DefaultMQProducer producer = ProducerFactory.getRMQProducer(BaseConf.nsAddr);
DefaultMQPullConsumer consumer = ConsumerFactory.getRMQPullConsumer(BaseConf.nsAddr, consumerGroup);
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setTopic(topic);
message.setBody(("Hello" + i).getBytes());
SendResult sendResult = producer.send(message);
Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
Assert.assertEquals(0, sendResult.getMessageQueue().getQueueId());
Assert.assertEquals(brokerName, sendResult.getMessageQueue().getBrokerName());
Assert.assertEquals(i, sendResult.getQueueOffset());
Assert.assertNotNull(sendResult.getMsgId());
Assert.assertNotNull(sendResult.getOffsetMsgId());
}
Thread.sleep(500);
Assert.assertEquals(0, brokerController.getMessageStore().getMinOffsetInQueue(topic, 0));
Assert.assertEquals(10, brokerController.getMessageStore().getMaxOffsetInQueue(topic, 0));
MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
PullResult pullResult= consumer.pull(messageQueue, "*", 0, 32);
Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus());
Assert.assertEquals(10, pullResult.getMsgFoundList().size());
for (int i = 0; i < 10; i++) {
MessageExt messageExt = pullResult.getMsgFoundList().get(i);
Assert.assertEquals(i, messageExt.getQueueOffset());
Assert.assertArrayEquals(("Hello" + i).getBytes(), messageExt.getBody());
}
brokerController.shutdown();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册