diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreTestUtil.java b/store/src/test/java/org/apache/rocketmq/store/StoreTestUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..59809c27cb3fa027b4da581dbf6fe5620a89ab50 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/StoreTestUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.store.index.IndexFile; +import org.apache.rocketmq.store.index.IndexService; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; + + +public class StoreTestUtil { + + private static final InternalLogger log = InternalLoggerFactory.getLogger(StoreTestUtil.class); + + public static boolean isCommitLogAvailable(DefaultMessageStore store) { + try { + + Field serviceField = store.getClass().getDeclaredField("reputMessageService"); + serviceField.setAccessible(true); + DefaultMessageStore.ReputMessageService reputService = + (DefaultMessageStore.ReputMessageService) serviceField.get(store); + + Method method = DefaultMessageStore.ReputMessageService.class.getDeclaredMethod("isCommitLogAvailable"); + method.setAccessible(true); + return (boolean) method.invoke(reputService); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + + public static void flushConsumeQueue(DefaultMessageStore store) throws Exception { + Field field = store.getClass().getDeclaredField("flushConsumeQueueService"); + field.setAccessible(true); + DefaultMessageStore.FlushConsumeQueueService flushService = (DefaultMessageStore.FlushConsumeQueueService) field.get(store); + + final int RETRY_TIMES_OVER = 3; + Method method = DefaultMessageStore.FlushConsumeQueueService.class.getDeclaredMethod("doFlush", int.class); + method.setAccessible(true); + method.invoke(flushService, RETRY_TIMES_OVER); + } + + + public static void waitCommitLogReput(DefaultMessageStore store) { + for (int i = 0; i < 500 && isCommitLogAvailable(store); i++) { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + } + + if (isCommitLogAvailable(store)) { + log.warn("isCommitLogAvailable expected false ,but true"); + } + } + + + public static void flushConsumeIndex(DefaultMessageStore store) throws NoSuchFieldException, Exception { + Field field = store.getClass().getDeclaredField("indexService"); + field.setAccessible(true); + IndexService indexService = (IndexService) field.get(store); + + Field field2 = indexService.getClass().getDeclaredField("indexFileList"); + field2.setAccessible(true); + ArrayList indexFileList = (ArrayList) field2.get(indexService); + + for (IndexFile f : indexFileList) { + indexService.flush(f); + } + } +} diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java index c436aebdc01481213ab2ac1880eefe8536cde6f5..befbefd28d6476ee88b1d82da9ad30cd40bf00ef 100644 --- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java @@ -50,7 +50,7 @@ public class ScheduleMessageServiceTest { * t * defaultMessageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" */ - String testMessageDelayLevel = "2s 3s"; + String testMessageDelayLevel = "5s 8s"; /** * choose delay level */ @@ -111,7 +111,7 @@ public class ScheduleMessageServiceTest { @Test - public void deliverDelayedMessageTimerTaskTest() throws InterruptedException { + public void deliverDelayedMessageTimerTaskTest() throws Exception { MessageExtBrokerInner msg = buildMessage(); int realQueueId = msg.getQueueId(); // set delayLevel,and send delay message @@ -119,6 +119,8 @@ public class ScheduleMessageServiceTest { PutMessageResult result = messageStore.putMessage(msg); assertThat(result.isOk()).isTrue(); + // make sure consumerQueue offset = commitLog offset + StoreTestUtil.waitCommitLogReput(messageStore); // consumer message int delayQueueId = ScheduleMessageService.delayLevel2QueueId(delayLevel); @@ -132,7 +134,7 @@ public class ScheduleMessageServiceTest { // timer run maybe delay, then consumer message again // and wait offsetTable - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(10); scheduleMessageService.buildRunningStats(new HashMap()); messageResult = getMessage(realQueueId, offset); @@ -188,7 +190,6 @@ public class ScheduleMessageServiceTest { @After public void shutdown() throws InterruptedException { - TimeUnit.SECONDS.sleep(1); messageStore.shutdown(); messageStore.destroy(); File file = new File(messageStoreConfig.getStorePathRootDir());