提交 2e04ca76 编写于 作者: R ranqiqiang 提交者: dinglei

[RIP-10]fix-bug: ScheduleMessageServiceTest (#869)

[RIP-10]fix-bug: ScheduleMessageServiceTest 
上级 6eff5048
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.store.schedule; package org.apache.rocketmq.store.schedule;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
...@@ -45,17 +46,17 @@ import static org.assertj.core.api.Assertions.assertThat; ...@@ -45,17 +46,17 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ScheduleMessageServiceTest { public class ScheduleMessageServiceTest {
/**t /**
* t
* defaultMessageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" * defaultMessageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
*/ */
String testMessageDelayLevel = "5s 10s"; String testMessageDelayLevel = "2s 3s";
/** /**
* choose delay level * choose delay level
* 1 = 5s
*/ */
int delayLevel = 1; int delayLevel = 2;
private static final String storePath = System.getProperty("user.home") + File.separator + "schedule_test"+ UUID.randomUUID(); private static final String storePath = System.getProperty("user.home") + File.separator + "schedule_test#" + UUID.randomUUID();
private static final int commitLogFileSize = 1024; private static final int commitLogFileSize = 1024;
private static final int cqFileSize = 10; private static final int cqFileSize = 10;
private static final int cqExtFileSize = 10 * (ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64); private static final int cqExtFileSize = 10 * (ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64);
...@@ -68,6 +69,8 @@ public class ScheduleMessageServiceTest { ...@@ -68,6 +69,8 @@ public class ScheduleMessageServiceTest {
ScheduleMessageService scheduleMessageService; ScheduleMessageService scheduleMessageService;
static String sendMessage = " ------- schedule message test -------"; static String sendMessage = " ------- schedule message test -------";
static String topic = "schedule_topic_test";
static String messageGroup = "delayGroupTest";
static { static {
...@@ -107,60 +110,32 @@ public class ScheduleMessageServiceTest { ...@@ -107,60 +110,32 @@ public class ScheduleMessageServiceTest {
} }
@Test
public void buildRunningStatsTest() throws InterruptedException {
MessageExtBrokerInner msg = buildMessage();
msg.setDelayTimeLevel(delayLevel);
messageStore.putMessage(msg);
// wait offsetTable
TimeUnit.SECONDS.sleep(1);
scheduleMessageService.buildRunningStats(new HashMap<String, String>() );
}
@Test
public void computeDeliverTimestampTest() {
// testMessageDelayLevel just "5s 10s"
long storeTime = System.currentTimeMillis();
long time1 = scheduleMessageService.computeDeliverTimestamp(1, storeTime);
assertThat(time1).isEqualTo(storeTime + 5 * 1000);
long time2 = scheduleMessageService.computeDeliverTimestamp(2, storeTime);
assertThat(time2).isEqualTo(storeTime + 10 * 1000);
}
@Test
public void delayLevel2QueueIdTest() {
int queueId = ScheduleMessageService.delayLevel2QueueId(delayLevel);
assertThat(queueId).isEqualTo(delayLevel - 1);
queueId = ScheduleMessageService.queueId2DelayLevel(delayLevel);
assertThat(queueId).isEqualTo(delayLevel + 1);
}
@Test @Test
public void deliverDelayedMessageTimerTaskTest() throws InterruptedException { public void deliverDelayedMessageTimerTaskTest() throws InterruptedException {
MessageExtBrokerInner msg = buildMessage(); MessageExtBrokerInner msg = buildMessage();
int realQueueId = msg.getQueueId();
// set delayLevel,and send delay message // set delayLevel,and send delay message
msg.setDelayTimeLevel(delayLevel); msg.setDelayTimeLevel(delayLevel);
PutMessageResult result = messageStore.putMessage(msg); PutMessageResult result = messageStore.putMessage(msg);
assertThat(result.isOk()).isTrue(); assertThat(result.isOk()).isTrue();
// consumer message // consumer message
int delayQueueId = ScheduleMessageService.delayLevel2QueueId(delayLevel);
assertThat(delayQueueId).isEqualTo(delayLevel - 1);
Long offset = result.getAppendMessageResult().getLogicsOffset(); Long offset = result.getAppendMessageResult().getLogicsOffset();
String messageGroup = "delayGroupTest";
GetMessageResult messageResult = messageStore.getMessage(messageGroup,msg.getTopic(),
msg.getQueueId(),offset,1,null);
// now, no message in queue,must wait > 5 seconds // now, no message in queue,must wait > delayTime
GetMessageResult messageResult = getMessage(realQueueId, offset);
assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.NO_MESSAGE_IN_QUEUE); assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
// timer run maybe delay, then consumer message again
// and wait offsetTable
TimeUnit.SECONDS.sleep(3);
scheduleMessageService.buildRunningStats(new HashMap<String, String>());
TimeUnit.SECONDS.sleep(6); messageResult = getMessage(realQueueId, offset);
messageResult = messageStore.getMessage(messageGroup,msg.getTopic(),
msg.getQueueId(),offset,1,null);
// now,found the message // now,found the message
assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND); assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
...@@ -178,22 +153,42 @@ public class ScheduleMessageServiceTest { ...@@ -178,22 +153,42 @@ public class ScheduleMessageServiceTest {
String retryMsg = new String(msgList.get(0).getBody()); String retryMsg = new String(msgList.get(0).getBody());
assertThat(sendMessage).isEqualTo(retryMsg); assertThat(sendMessage).isEqualTo(retryMsg);
// method will wait 10s,so I run it by myself
scheduleMessageService.persist();
// add mapFile release // add mapFile release
messageResult.release(); messageResult.release();
} }
/**
* add some [error/no use] code test
*/
@Test @Test
public void persist(){ public void otherTest() {
// because of the method will wait 10s // the method no use ,why need ?
scheduleMessageService.persist(); int queueId = ScheduleMessageService.queueId2DelayLevel(delayLevel);
assertThat(queueId).isEqualTo(delayLevel + 1);
// error delayLevelTest
Long time = scheduleMessageService.computeDeliverTimestamp(999, 0);
assertThat(time).isEqualTo(1000);
// just decode
scheduleMessageService.decode(new DelayOffsetSerializeWrapper().toJson());
}
private GetMessageResult getMessage(int queueId, Long offset) {
return messageStore.getMessage(messageGroup, topic,
queueId, offset, 1, null);
} }
@After @After
public void shutdown() throws InterruptedException { public void shutdown() throws InterruptedException {
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
scheduleMessageService.shutdown();
messageStore.shutdown(); messageStore.shutdown();
messageStore.destroy(); messageStore.destroy();
File file = new File(messageStoreConfig.getStorePathRootDir()); File file = new File(messageStoreConfig.getStorePathRootDir());
...@@ -205,11 +200,10 @@ public class ScheduleMessageServiceTest { ...@@ -205,11 +200,10 @@ public class ScheduleMessageServiceTest {
byte[] msgBody = sendMessage.getBytes(); byte[] msgBody = sendMessage.getBytes();
MessageExtBrokerInner msg = new MessageExtBrokerInner(); MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("schedule_topic_test"); msg.setTopic(topic);
msg.setTags("schedule_tag"); msg.setTags("schedule_tag");
msg.setKeys("schedule_key"); msg.setKeys("schedule_key");
msg.setBody(msgBody); msg.setBody(msgBody);
msg.setQueueId(0);
msg.setSysFlag(0); msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis()); msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(storeHost); msg.setStoreHost(storeHost);
...@@ -226,6 +220,4 @@ public class ScheduleMessageServiceTest { ...@@ -226,6 +220,4 @@ public class ScheduleMessageServiceTest {
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册