diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index b5bac3f721f62512093b953fa9d5db43658f558f..7b5ac45eddfac9a6e136ac58d9e6a43f6266825a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1111,7 +1111,7 @@ public class DefaultMessageStore implements MessageStore { return false; } - if ((messageTotal + 1) >= maxMsgNums) { + if (maxMsgNums <= messageTotal) { return true; } @@ -1120,7 +1120,7 @@ public class DefaultMessageStore implements MessageStore { return true; } - if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) { + if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) { return true; } } else { @@ -1128,7 +1128,7 @@ public class DefaultMessageStore implements MessageStore { return true; } - if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) { + if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) { return true; } } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 75f1de94128c0f6b329a86d9e08621f4e89088f5..273cc213d49f9ad8e55c7eb36545ea2a4b4b7ef3 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -22,9 +22,11 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Before; import org.junit.Test; @@ -45,19 +47,22 @@ public class DefaultMessageStoreTest { BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); } + public MessageStore buildMessageStore() throws Exception { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10); + messageStoreConfig.setMaxHashSlotNum(10000); + messageStoreConfig.setMaxIndexNum(100 * 100); + messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); + return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig()); + } + @Test public void testWriteAndRead() throws Exception { long totalMsgs = 100; QUEUE_TOTAL = 1; MessageBody = StoreMessage.getBytes(); - - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); - messageStoreConfig.setMaxHashSlotNum(100); - messageStoreConfig.setMaxIndexNum(100 * 10); - MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig()); - + MessageStore master = buildMessageStore(); boolean load = master.load(); assertTrue(load); @@ -86,7 +91,7 @@ public class DefaultMessageStoreTest { msg.setBody(MessageBody); msg.setKeys(String.valueOf(System.currentTimeMillis())); msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); - msg.setSysFlag(4); + msg.setSysFlag(0); msg.setBornTimestamp(System.currentTimeMillis()); msg.setStoreHost(StoreHost); msg.setBornHost(BornHost); @@ -123,6 +128,36 @@ public class DefaultMessageStoreTest { } } + @Test + public void testPullSize() throws Exception { + MessageStore messageStore = buildMessageStore(); + boolean load = messageStore.load(); + assertTrue(load); + messageStore.start(); + String topic = "pullSizeTopic"; + + for (int i = 0; i < 32; i++) { + MessageExtBrokerInner messageExtBrokerInner = buildMessage(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(0); + PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner); + } + //wait for consume queue build + Thread.sleep(100); + String group = "simple"; + GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null); + assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32); + + + GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null); + assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20); + + GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null); + assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10); + + messageStore.shutdown(); + } + private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 92f77b8533d34e0a3015455fc21c6b1d34f15a25..8516779e4363149ac5d206113c553297978f60d4 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -41,7 +41,7 @@ public class BaseConf { protected static String clusterName; protected static int brokerNum; protected static int waitTime = 5; - protected static int consumeTime = 1 * 60 * 1000; + protected static int consumeTime = 5 * 60 * 1000; protected static NamesrvController namesrvController; protected static BrokerController brokerController1; protected static BrokerController brokerController2; diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 07af7aa6e3d3ac101d0b984f51705ae45dfd912e..61e98e2071f3a5a926a9a0376a8ad636d917e982 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -148,17 +148,17 @@ public class IntegrationTestBase { return brokerController; } - public static boolean initTopic(String topic, String nsAddr, String clusterName) { + public static boolean initTopic(String topic, String nsAddr, String clusterName,int queueNumbers){ long startTime = System.currentTimeMillis(); boolean createResult; while (true) { - createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8); + createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, queueNumbers); if (createResult) { break; } else if (System.currentTimeMillis() - startTime > topicCreateTime) { Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic, - System.currentTimeMillis() - startTime)); + System.currentTimeMillis() - startTime)); break; } else { TestUtils.waitForMoment(500); @@ -169,6 +169,10 @@ public class IntegrationTestBase { return createResult; } + public static boolean initTopic(String topic, String nsAddr, String clusterName) { + return initTopic(topic, nsAddr, clusterName,8); + } + public static void deleteFile(File file) { if (!file.exists()) { return; diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java index 2c9abc067fca1a4f323cec5f7ab5b78d0e1ae049..47cde74a43c9bf1742fdd7b32cb6b84ea83f90b5 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java @@ -38,6 +38,8 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT { private RMQNormalProducer producer = null; private String topic = null; + private int broadcastConsumeTime = 1 * 60 * 1000; + @Before public void setUp() { topic = initTopic(); @@ -60,12 +62,12 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT { consumer1.getConsumerGroup(), topic, "*", new RMQOrderListener()); TestUtils.waitForSeconds(waitTime); + List mqs = producer.getMessageQueue(); MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); producer.send(mqMsgs.getMsgsWithMQ()); - - consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); - consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), broadcastConsumeTime); + consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), broadcastConsumeTime); assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs())) .isEqualTo(true);