diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 028d53795997292b0082eecc9e13176c984a7bae..e923f879d543fd3909fbf9b2d2ecb36ccf35976c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.store.dledger; +import io.openmessaging.storage.dledger.AppendFuture; import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.entry.DLedgerEntry; @@ -28,7 +29,6 @@ import io.openmessaging.storage.dledger.store.file.MmapFileList; import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageAccessor; @@ -357,54 +357,49 @@ public class DLedgerCommitLog extends CommitLog { } // Back to Results - AppendMessageResult appendResult = null; - PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; - CompletableFuture dlegerFuture = null; - EncodeResult encodeResult = null; - long eclipseTimeInLock = 0L; + PutMessageStatus putMessageStatus = null; + AppendMessageResult appendResult; + AppendFuture dledgerFuture; + EncodeResult encodeResult; + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - long queueOffset = -1; + long eclipseTimeInLock; + long queueOffset; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - //TO DO use buffer encodeResult = this.messageSerializer.serialize(msg); queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); if (encodeResult.status != AppendMessageStatus.PUT_OK) { - appendResult = new AppendMessageResult(encodeResult.status); - switch (encodeResult.status) { - case PROPERTIES_SIZE_EXCEEDED: - case MESSAGE_SIZE_EXCEEDED: - putMessageStatus = PutMessageStatus.MESSAGE_ILLEGAL; - break; - } - } else { - AppendEntryRequest request = new AppendEntryRequest(); - request.setGroup(dLedgerConfig.getGroup()); - request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBody(encodeResult.data); - dlegerFuture = dLedgerServer.handleAppend(request); - if (dlegerFuture.isDone() && dlegerFuture.get().getCode() != DLedgerResponseCode.SUCCESS.getCode()) { - //TO DO make sure the local store is ok - appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); - } else { - switch (tranType) { - case MessageSysFlag.TRANSACTION_PREPARED_TYPE: - case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: - break; - case MessageSysFlag.TRANSACTION_NOT_TYPE: - case MessageSysFlag.TRANSACTION_COMMIT_TYPE: - // The next update ConsumeQueue information - DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); - break; - default: - break; - } - } + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); + } + AppendEntryRequest request = new AppendEntryRequest(); + request.setGroup(dLedgerConfig.getGroup()); + request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); + request.setBody(encodeResult.data); + dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); + if (dledgerFuture.getPos() == -1) { + return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); } + long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; + ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); + String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock); + switch (tranType) { + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + // The next update ConsumeQueue information + DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); + break; + default: + break; + } } catch (Exception e) { log.error("Put message error", e); - appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); } finally { beginTimeInDledgerLock = 0; putMessageLock.unlock(); @@ -414,34 +409,27 @@ public class DLedgerCommitLog extends CommitLog { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, appendResult); } - if (dlegerFuture != null) { - try { - AppendEntryResponse appendEntryResponse = dlegerFuture.get(3, TimeUnit.SECONDS); - switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { - case SUCCESS: - putMessageStatus = PutMessageStatus.PUT_OK; - long wroteOffset = appendEntryResponse.getPos() + DLedgerEntry.BODY_OFFSET; - ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); - String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, 0); - break; - case INCONSISTENT_LEADER: - case NOT_LEADER: - case LEADER_NOT_READY: - case DISK_FULL: - putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; - break; - case WAIT_QUORUM_ACK_TIMEOUT: - putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT; - break; - case LEADER_PENDING_FULL: - putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; - break; - } - } catch (Exception ignored) { - putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT; - appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + try { + AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); + switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { + case SUCCESS: + putMessageStatus = PutMessageStatus.PUT_OK; + break; + case INCONSISTENT_LEADER: + case NOT_LEADER: + case LEADER_NOT_READY: + case DISK_FULL: + putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; + break; + case WAIT_QUORUM_ACK_TIMEOUT: + putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT; + break; + case LEADER_PENDING_FULL: + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; + break; } + } catch (Throwable ignored) { + putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT; } PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 57742856d32358d2884b499b644efb07967b9a49..57f04ea6a33d58df5ac49e6c5662cc895214befe 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -12,19 +12,21 @@ import io.openmessaging.storage.dledger.DLedgerServer; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; +import org.apache.rocketmq.store.MappedFile; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.StoreTestBase; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Assert; import org.junit.Test; public class DLedgerCommitlogTest extends StoreTestBase { - private DefaultMessageStore createMessageStore(String base, String group, String selfId, String peers, String leaderId) throws Exception { + private DefaultMessageStore createMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception { baseDirs.add(base); MessageStoreConfig storeConfig = new MessageStoreConfig(); storeConfig.setMapedFileSizeCommitLog(1024 * 100); @@ -39,7 +41,7 @@ public class DLedgerCommitlogTest extends StoreTestBase { storeConfig.setdLegerGroup(group); storeConfig.setdLegerPeers(peers); storeConfig.setdLegerSelfId(selfId); - DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLegerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { }, new BrokerConfig()); if (leaderId != null) { @@ -52,17 +54,67 @@ public class DLedgerCommitlogTest extends StoreTestBase { } } + if (createAbort) { + String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir()); + File file = new File(fileName); + MappedFile.ensureDirOK(file.getParent()); + file.createNewFile(); + } defaultMessageStore.load(); defaultMessageStore.start(); return defaultMessageStore; } + + + @Test + public void testReputOffset() throws Exception { + + String base = createBaseDir(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + + { + DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false); + Thread.sleep(1000); + String topic = UUID.randomUUID().toString(); + for (int i = 0; i < 1000; i++) { + MessageExtBrokerInner msgInner = buildMessage(); + msgInner.setTopic(topic); + msgInner.setQueueId(0); + PutMessageResult putMessageResult = messageStore.putMessage(msgInner); + Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset()); + } + Thread.sleep(100); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + messageStore.shutdown(); + } + + { + //normal recover + DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + messageStore.shutdown(); + } + { + //normal recover + DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, true); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + messageStore.shutdown(); + } + } + + + @Test public void testPutAndGetMessage() throws Exception { String base = createBaseDir(); String peers = String.format("n0-localhost:%d", nextPort()); String group = UUID.randomUUID().toString(); - DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null); + DefaultMessageStore messageStore = createMessageStore(base, group, "n0", peers, null, false); Thread.sleep(1000); String topic = UUID.randomUUID().toString(); @@ -102,7 +154,7 @@ public class DLedgerCommitlogTest extends StoreTestBase { public void testCommittedPos() throws Exception { String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); String group = UUID.randomUUID().toString(); - DefaultMessageStore leaderStore = createMessageStore(createBaseDir(), group,"n0", peers, "n0"); + DefaultMessageStore leaderStore = createMessageStore(createBaseDir(), group,"n0", peers, "n0", false); String topic = UUID.randomUUID().toString(); MessageExtBrokerInner msgInner = buildMessage(); @@ -117,7 +169,7 @@ public class DLedgerCommitlogTest extends StoreTestBase { Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); - DefaultMessageStore followerStore = createMessageStore(createBaseDir(), group,"n1", peers, "n0"); + DefaultMessageStore followerStore = createMessageStore(createBaseDir(), group,"n1", peers, "n0", false); Thread.sleep(2000); Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));