diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java index dc05c3b2976accb696fcef247b28d74f3773988a..c436aebdc01481213ab2ac1880eefe8536cde6f5 100644 --- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.store.schedule; + import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageDecoder; @@ -45,17 +46,17 @@ import static org.assertj.core.api.Assertions.assertThat; public class ScheduleMessageServiceTest { - /**t + /** + * t * defaultMessageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" */ - String testMessageDelayLevel = "5s 10s"; + String testMessageDelayLevel = "2s 3s"; /** * choose delay level - * 1 = 5s */ - int delayLevel = 1; + int delayLevel = 2; - private static final String storePath = System.getProperty("user.home") + File.separator + "schedule_test"+ UUID.randomUUID(); + private static final String storePath = System.getProperty("user.home") + File.separator + "schedule_test#" + UUID.randomUUID(); private static final int commitLogFileSize = 1024; private static final int cqFileSize = 10; private static final int cqExtFileSize = 10 * (ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64); @@ -67,7 +68,9 @@ public class ScheduleMessageServiceTest { BrokerConfig brokerConfig; ScheduleMessageService scheduleMessageService; - static String sendMessage = " ------- schedule message test -------"; + static String sendMessage = " ------- schedule message test -------"; + static String topic = "schedule_topic_test"; + static String messageGroup = "delayGroupTest"; static { @@ -107,67 +110,39 @@ public class ScheduleMessageServiceTest { } - - @Test - public void buildRunningStatsTest() throws InterruptedException { - MessageExtBrokerInner msg = buildMessage(); - msg.setDelayTimeLevel(delayLevel); - messageStore.putMessage(msg); - // wait offsetTable - TimeUnit.SECONDS.sleep(1); - scheduleMessageService.buildRunningStats(new HashMap() ); - } - - - @Test - public void computeDeliverTimestampTest() { - // testMessageDelayLevel just "5s 10s" - long storeTime = System.currentTimeMillis(); - long time1 = scheduleMessageService.computeDeliverTimestamp(1, storeTime); - assertThat(time1).isEqualTo(storeTime + 5 * 1000); - - long time2 = scheduleMessageService.computeDeliverTimestamp(2, storeTime); - assertThat(time2).isEqualTo(storeTime + 10 * 1000); - - } - - - @Test - public void delayLevel2QueueIdTest() { - int queueId = ScheduleMessageService.delayLevel2QueueId(delayLevel); - assertThat(queueId).isEqualTo(delayLevel - 1); - queueId = ScheduleMessageService.queueId2DelayLevel(delayLevel); - assertThat(queueId).isEqualTo(delayLevel + 1); - } - @Test public void deliverDelayedMessageTimerTaskTest() throws InterruptedException { MessageExtBrokerInner msg = buildMessage(); + int realQueueId = msg.getQueueId(); // set delayLevel,and send delay message msg.setDelayTimeLevel(delayLevel); PutMessageResult result = messageStore.putMessage(msg); assertThat(result.isOk()).isTrue(); + // consumer message + int delayQueueId = ScheduleMessageService.delayLevel2QueueId(delayLevel); + assertThat(delayQueueId).isEqualTo(delayLevel - 1); + Long offset = result.getAppendMessageResult().getLogicsOffset(); - String messageGroup = "delayGroupTest"; - GetMessageResult messageResult = messageStore.getMessage(messageGroup,msg.getTopic(), - msg.getQueueId(),offset,1,null); - // now, no message in queue,must wait > 5 seconds + // now, no message in queue,must wait > delayTime + GetMessageResult messageResult = getMessage(realQueueId, offset); assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.NO_MESSAGE_IN_QUEUE); + // timer run maybe delay, then consumer message again + // and wait offsetTable + TimeUnit.SECONDS.sleep(3); + scheduleMessageService.buildRunningStats(new HashMap()); - TimeUnit.SECONDS.sleep(6); - messageResult = messageStore.getMessage(messageGroup,msg.getTopic(), - msg.getQueueId(),offset,1,null); + messageResult = getMessage(realQueueId, offset); // now,found the message assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND); // get the message body ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize()); - List byteBufferList = messageResult.getMessageBufferList(); + List byteBufferList = messageResult.getMessageBufferList(); for (ByteBuffer bb : byteBufferList) { byteBuffer.put(bb); } @@ -178,22 +153,42 @@ public class ScheduleMessageServiceTest { String retryMsg = new String(msgList.get(0).getBody()); assertThat(sendMessage).isEqualTo(retryMsg); + // method will wait 10s,so I run it by myself + scheduleMessageService.persist(); + // add mapFile release messageResult.release(); } + /** + * add some [error/no use] code test + */ @Test - public void persist(){ - // because of the method will wait 10s - scheduleMessageService.persist(); + public void otherTest() { + // the method no use ,why need ? + int queueId = ScheduleMessageService.queueId2DelayLevel(delayLevel); + assertThat(queueId).isEqualTo(delayLevel + 1); + + // error delayLevelTest + Long time = scheduleMessageService.computeDeliverTimestamp(999, 0); + assertThat(time).isEqualTo(1000); + + // just decode + scheduleMessageService.decode(new DelayOffsetSerializeWrapper().toJson()); + } + + + private GetMessageResult getMessage(int queueId, Long offset) { + return messageStore.getMessage(messageGroup, topic, + queueId, offset, 1, null); + } @After public void shutdown() throws InterruptedException { TimeUnit.SECONDS.sleep(1); - scheduleMessageService.shutdown(); messageStore.shutdown(); messageStore.destroy(); File file = new File(messageStoreConfig.getStorePathRootDir()); @@ -205,11 +200,10 @@ public class ScheduleMessageServiceTest { byte[] msgBody = sendMessage.getBytes(); MessageExtBrokerInner msg = new MessageExtBrokerInner(); - msg.setTopic("schedule_topic_test"); + msg.setTopic(topic); msg.setTags("schedule_tag"); msg.setKeys("schedule_key"); msg.setBody(msgBody); - msg.setQueueId(0); msg.setSysFlag(0); msg.setBornTimestamp(System.currentTimeMillis()); msg.setStoreHost(storeHost); @@ -226,6 +220,4 @@ public class ScheduleMessageServiceTest { } - - }