diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index 0a166d9551bc7ae37d02a1e89dcfe9f46739c548..83efcc1915e7a1845a652d9e872108930eacd74c 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -29,6 +29,9 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -90,7 +93,7 @@ public class HATest { } @Test - public void testHandleHA() throws Exception{ + public void testHandleHA() { long totalMsgs = 10; QUEUE_TOTAL = 1; MessageBody = StoreMessage.getBytes(); @@ -98,7 +101,20 @@ public class HATest { messageStore.putMessage(buildMessage()); } - Thread.sleep(1000L);//sleep 1000 ms + for (int i = 0; i < 100 && isCommitLogAvailable((DefaultMessageStore) messageStore); i++) { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + } + + for (int i = 0; i < 100 && isCommitLogAvailable((DefaultMessageStore) slaveMessageStore); i++) { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + } + for (long i = 0; i < totalMsgs; i++) { GetMessageResult result = slaveMessageStore.getMessage("GROUP_A", "FooBar", 0, i, 1024 * 1024, null); assertThat(result).isNotNull(); @@ -147,4 +163,20 @@ public class HATest { return msg; } + private boolean isCommitLogAvailable(DefaultMessageStore store) { + try { + + Field serviceField = store.getClass().getDeclaredField("reputMessageService"); + serviceField.setAccessible(true); + DefaultMessageStore.ReputMessageService reputService = + (DefaultMessageStore.ReputMessageService) serviceField.get(store); + + Method method = DefaultMessageStore.ReputMessageService.class.getDeclaredMethod("isCommitLogAvailable"); + method.setAccessible(true); + return (boolean) method.invoke(reputService); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | NoSuchFieldException e ) { + throw new RuntimeException(e); + } + } + }