diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java index 3d3b63a121fb7c39dc707dcd52c73b571faf4849..3fcf588b9bf7f939a216af32e7784df38f210b95 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java @@ -21,7 +21,7 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa @Override public void handle(long term, MemberState.Role role) { try { - log.info("Begin handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); + log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); switch (role) { case CANDIDATE: if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { @@ -41,9 +41,9 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa default: break; } - log.info("Finish handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); + log.info("Finish handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); } catch (Throwable t) { - log.info("Failed handling lastRole change term={} lastRole={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), t); + log.info("Failed handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), t); } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index cd0c46def2795ec12a2bf9df12446426aef6abeb..8f5c4e80f41d01bd0f5d3799b0cf6c1a616b0717 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -132,8 +132,11 @@ public class DefaultMessageStore implements MessageStore { this.cleanConsumeQueueService = new CleanConsumeQueueService(); this.storeStatsService = new StoreStatsService(); this.indexService = new IndexService(this); - this.haService = new HAService(this); - + if (!messageStoreConfig.isEnableDLegerCommitLog()) { + this.haService = new HAService(this); + } else { + this.haService = null; + } this.reputMessageService = new ReputMessageService(); this.scheduleMessageService = new ScheduleMessageService(this); diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java index 0f94f3606c547c9b5f4c3cf529ffb4e63f208cb0..c37cf1f96ba57c647ef951405f1aaad50e2fc2ec 100644 --- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.factory; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; @@ -60,4 +61,11 @@ public class ConsumerFactory { consumer.start(); return consumer; } + + public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception { + DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); + defaultMQPullConsumer.setNamesrvAddr(nsAddr); + defaultMQPullConsumer.start(); + return defaultMQPullConsumer; + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 5027a3cce070397c1d49a51891ef6b9b16538fed..1f4fe7175ececfb92e6ecb0f362820e434d0e758 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -33,7 +33,7 @@ import org.apache.rocketmq.test.util.MQAdmin; import org.apache.rocketmq.test.util.MQRandomUtils; public class BaseConf { - protected static String nsAddr; + public static String nsAddr; protected static String broker1Name; protected static String broker2Name; protected static String clusterName; diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 48d9a266e090b7a95d4d040c22aa64fd8a7a50d4..8890a587397ebeffcea5b125f0cb2a11d326deda 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -50,6 +50,11 @@ public class IntegrationTestBase { protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 256; protected static final int INDEX_NUM = 1000; + private static final AtomicInteger port = new AtomicInteger(50000); + + public static synchronized int nextPort() { + return port.addAndGet(5); + } protected static Random random = new Random(); static { @@ -87,7 +92,7 @@ public class IntegrationTestBase { } - private static String createBaseDir() { + public static String createBaseDir() { String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID(); final File file = new File(baseDir); if (file.exists()) { @@ -112,7 +117,7 @@ public class IntegrationTestBase { logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort()); namesrvController.start(); } catch (Exception e) { - logger.info("Name Server start failed"); + logger.info("Name Server start failed", e); System.exit(1); } NAMESRV_CONTROLLERS.add(namesrvController); @@ -123,8 +128,6 @@ public class IntegrationTestBase { public static BrokerController createAndStartBroker(String nsAddr) { String baseDir = createBaseDir(); BrokerConfig brokerConfig = new BrokerConfig(); - NettyServerConfig nettyServerConfig = new NettyServerConfig(); - NettyClientConfig nettyClientConfig = new NettyClientConfig(); MessageStoreConfig storeConfig = new MessageStoreConfig(); brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); brokerConfig.setBrokerIP1("127.0.0.1"); @@ -132,18 +135,25 @@ public class IntegrationTestBase { brokerConfig.setEnablePropertyFilter(true); storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); - storeConfig.setHaListenPort(8000 + random.nextInt(1000)); - storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE); + storeConfig.setMapedFileSizeCommitLog(100 * 1024 * 1024); storeConfig.setMaxIndexNum(INDEX_NUM); storeConfig.setMaxHashSlotNum(INDEX_NUM * 4); - nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); + return createAndStartBroker(storeConfig, brokerConfig); + + } + + public static BrokerController createAndStartBroker(MessageStoreConfig storeConfig, BrokerConfig brokerConfig) { + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + NettyClientConfig nettyClientConfig = new NettyClientConfig(); + nettyServerConfig.setListenPort(nextPort()); + storeConfig.setHaListenPort(nextPort()); BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); try { Assert.assertTrue(brokerController.initialize()); logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); brokerController.start(); - } catch (Exception e) { - logger.info("Broker start failed"); + } catch (Throwable t) { + logger.error("Broker start failed, will exit", t); System.exit(1); } BROKER_CONTROLLERS.add(brokerController); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dleger/ProduceAndConsumeTest.java b/test/src/test/java/org/apache/rocketmq/test/base/dleger/ProduceAndConsumeTest.java new file mode 100644 index 0000000000000000000000000000000000000000..30ddff4f24136f7ceb5f02f5446c2483f5d53a5a --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/base/dleger/ProduceAndConsumeTest.java @@ -0,0 +1,102 @@ +package org.apache.rocketmq.test.base.dleger; + +import java.util.UUID; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.apache.rocketmq.test.factory.ConsumerFactory; +import org.apache.rocketmq.test.factory.ProducerFactory; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.rocketmq.test.base.IntegrationTestBase.nextPort; +import static sun.util.locale.BaseLocale.SEP; + +public class ProduceAndConsumeTest { + + public BrokerConfig buildBrokerConfig(String cluster, String brokerName) { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setBrokerClusterName(cluster); + brokerConfig.setBrokerName(brokerName); + brokerConfig.setBrokerIP1("127.0.0.1"); + brokerConfig.setNamesrvAddr(BaseConf.nsAddr); + return brokerConfig; + } + + public MessageStoreConfig buildStoreConfig(String brokerName, String peers, String selfId) { + MessageStoreConfig storeConfig = new MessageStoreConfig(); + String baseDir = IntegrationTestBase.createBaseDir(); + storeConfig.setStorePathRootDir(baseDir); + storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); + storeConfig.setHaListenPort(nextPort()); + storeConfig.setMapedFileSizeCommitLog(10 * 1024 * 1024); + storeConfig.setEnableDLegerCommitLog(true); + storeConfig.setdLegerGroup(brokerName); + storeConfig.setdLegerSelfId(selfId); + storeConfig.setdLegerPeers(peers); + return storeConfig; + } + + @Test + public void testProduceAndConsume() throws Exception { + String cluster = UUID.randomUUID().toString(); + String brokerName = UUID.randomUUID().toString(); + String selfId = "n0"; + String peers = String.format("n0-localhost:%d", nextPort()); + BrokerConfig brokerConfig = buildBrokerConfig(cluster, brokerName); + MessageStoreConfig storeConfig = buildStoreConfig(brokerName, peers, selfId); + BrokerController brokerController = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig); + Thread.sleep(1000); + + Assert.assertEquals(BrokerRole.SYNC_MASTER, storeConfig.getBrokerRole()); + + + String topic = UUID.randomUUID().toString(); + String consumerGroup = UUID.randomUUID().toString(); + IntegrationTestBase.initTopic(topic, BaseConf.nsAddr, cluster, 1); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(BaseConf.nsAddr); + DefaultMQPullConsumer consumer = ConsumerFactory.getRMQPullConsumer(BaseConf.nsAddr, consumerGroup); + + for (int i = 0; i < 10; i++) { + Message message = new Message(); + message.setTopic(topic); + message.setBody(("Hello" + i).getBytes()); + SendResult sendResult = producer.send(message); + Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); + Assert.assertEquals(0, sendResult.getMessageQueue().getQueueId()); + Assert.assertEquals(brokerName, sendResult.getMessageQueue().getBrokerName()); + Assert.assertEquals(i, sendResult.getQueueOffset()); + Assert.assertNotNull(sendResult.getMsgId()); + Assert.assertNotNull(sendResult.getOffsetMsgId()); + } + + Thread.sleep(500); + Assert.assertEquals(0, brokerController.getMessageStore().getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(10, brokerController.getMessageStore().getMaxOffsetInQueue(topic, 0)); + + MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0); + PullResult pullResult= consumer.pull(messageQueue, "*", 0, 32); + Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus()); + Assert.assertEquals(10, pullResult.getMsgFoundList().size()); + + for (int i = 0; i < 10; i++) { + MessageExt messageExt = pullResult.getMsgFoundList().get(i); + Assert.assertEquals(i, messageExt.getQueueOffset()); + Assert.assertArrayEquals(("Hello" + i).getBytes(), messageExt.getBody()); + } + + brokerController.shutdown(); + } +}