diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java index 3fcf588b9bf7f939a216af32e7784df38f210b95..267f7372798f0d21dc6cbf23f8636704723efff2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java @@ -4,6 +4,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.dleger.DLegerLeaderElector; import org.apache.rocketmq.dleger.MemberState; +import org.apache.rocketmq.dleger.utils.UtilAll; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.DefaultMessageStore; @@ -20,6 +21,7 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa } @Override public void handle(long term, MemberState.Role role) { + long start = System.currentTimeMillis(); try { log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); switch (role) { @@ -41,9 +43,9 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa default: break; } - log.info("Finish handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); + log.info("Finish handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start)); } catch (Throwable t) { - log.info("Failed handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), t); + log.info("Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), UtilAll.elapsed(start), t); } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java index 16a2f27486b46c8e6a5c2f58068fe53558d589fc..3b0ee87a60380cc1567d5fa317470b07bab853f2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java @@ -133,12 +133,6 @@ public class DLegerCommitLog extends CommitLog { return 0; } - /** - * Read CommitLog data, use data replication - */ - public SelectMappedBufferResult getData(final long offset) { - return this.getData(offset, offset == 0); - } private static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult { @@ -164,14 +158,37 @@ public class DLegerCommitLog extends CommitLog { } + public SelectMmapBufferResult truncate(SelectMmapBufferResult sbr) { + long committedPos = dLegerFileStore.getCommittedPos(); + if (sbr == null || sbr.getStartOffset() == committedPos) { + return null; + } + if (sbr.getStartOffset() + sbr.getSize() <= committedPos) { + return sbr; + } else { + sbr.setSize((int) (committedPos - sbr.getStartOffset())); + return sbr; + } + } + + /** + * Read CommitLog data, use data replication + */ + public SelectMappedBufferResult getData(final long offset) { + return this.getData(offset, offset == 0); + } + public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { + if (offset >= dLegerFileStore.getCommittedPos()) { + return null; + } int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); SelectMmapBufferResult sbr = mappedFile.selectMappedBuffer(pos); - return convertSbr(sbr); + return convertSbr(truncate(sbr)); } return null; @@ -353,6 +370,9 @@ public class DLegerCommitLog extends CommitLog { case DISK_FULL: putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; break; + case WAIT_QUORUM_ACK_TIMEOUT: + putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT; + break; case LEADER_PENDING_FULL: putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; break; diff --git a/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java index 67c2ed9164973906bd8ef82bd4306f919673702e..193bd50c40f250c3868132822e4b836f6e983ff8 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java @@ -4,15 +4,14 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.UUID; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.dleger.DLegerServer; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; -import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; @@ -25,7 +24,7 @@ import org.junit.Test; public class DLegerCommitlogTest extends StoreTestBase { - private DefaultMessageStore createMessageStore(String base) throws Exception { + private DefaultMessageStore createMessageStore(String base, String selfId, String peers, String leaderId) throws Exception { baseDirs.add(base); MessageStoreConfig storeConfig = new MessageStoreConfig(); storeConfig.setMapedFileSizeCommitLog(1024 * 100); @@ -38,16 +37,21 @@ public class DLegerCommitlogTest extends StoreTestBase { storeConfig.setEnableDLegerCommitLog(true); storeConfig.setdLegerGroup(UUID.randomUUID().toString()); - storeConfig.setdLegerPeers(String.format("n0-localhost:%d", nextPort())); - storeConfig.setdLegerSelfId("n0"); - DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLegerCommitlogTest"), new MessageArrivingListener() { + storeConfig.setdLegerPeers(peers); + storeConfig.setdLegerSelfId(selfId); + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLegerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { - @Override - public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, - byte[] filterBitMap, Map properties) { - - } }, new BrokerConfig()); + if (leaderId != null) { + DLegerServer dLegerServer = ((DLegerCommitLog) defaultMessageStore.getCommitLog()).getdLegerServer(); + dLegerServer.getdLegerConfig().setEnableLeaderElector(false); + if (selfId.equals(leaderId)) { + dLegerServer.getMemberState().changeToLeader(-1); + } else { + dLegerServer.getMemberState().changeToFollower(-1, leaderId); + } + + } defaultMessageStore.load(); defaultMessageStore.start(); return defaultMessageStore; @@ -56,7 +60,8 @@ public class DLegerCommitlogTest extends StoreTestBase { @Test public void testPutAndGetMessage() throws Exception { String base = createBaseDir(); - DefaultMessageStore messageStore = createMessageStore(base); + String peers = String.format("n0-localhost:%d", nextPort()); + DefaultMessageStore messageStore = createMessageStore(base, "n0", peers, null); Thread.sleep(1000); String topic = UUID.randomUUID().toString(); @@ -91,4 +96,34 @@ public class DLegerCommitlogTest extends StoreTestBase { } + @Test + public void testCommittedPos() throws Exception { + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + DefaultMessageStore leaderStore = createMessageStore(createBaseDir(), "n0", peers, "n0"); + + String topic = UUID.randomUUID().toString(); + MessageExtBrokerInner msgInner = buildMessage(); + msgInner.setTopic(topic); + msgInner.setQueueId(0); + PutMessageResult putMessageResult = leaderStore.putMessage(msgInner); + Assert.assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, putMessageResult.getPutMessageStatus()); + + Thread.sleep(1000); + + Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0); + Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); + + + DefaultMessageStore followerStore = createMessageStore(createBaseDir(), "n1", peers, "n0"); + Thread.sleep(2000); + + Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0)); + + + leaderStore.destroy(); + followerStore.destroy(); + } + + }