diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index b5ebefe9d61c44e6dd28d8be4420935e59b40db8..86105ca4d2c5a7b25db70fcf1bcabe79e09ebca3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -27,6 +27,7 @@ import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; import io.openmessaging.storage.dledger.store.file.MmapFile; import io.openmessaging.storage.dledger.store.file.MmapFileList; import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; +import io.openmessaging.storage.dledger.utils.DLedgerUtils; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.concurrent.TimeUnit; @@ -190,7 +191,7 @@ public class DLedgerCommitLog extends CommitLog { long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { while (!mappedFile.destroy(10 * 1000)) { - io.openmessaging.storage.dledger.utils.UtilAll.sleep(1000); + DLedgerUtils.sleep(1000); } mappedFileQueue.getMappedFiles().remove(mappedFile); } @@ -397,7 +398,6 @@ public class DLedgerCommitLog extends CommitLog { } // Back to Results - PutMessageStatus putMessageStatus = null; AppendMessageResult appendResult; AppendFuture dledgerFuture; EncodeResult encodeResult; @@ -449,6 +449,7 @@ public class DLedgerCommitLog extends CommitLog { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, appendResult); } + PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; try { AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { @@ -462,14 +463,15 @@ public class DLedgerCommitLog extends CommitLog { putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; break; case WAIT_QUORUM_ACK_TIMEOUT: - putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT; + //Do not return flush_slave_timeout to the client, for the ons client will ignore it. + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; break; case LEADER_PENDING_FULL: putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; break; } - } catch (Throwable ignored) { - putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT; + } catch (Throwable t) { + log.error("Failed to get dledger append result", t); } PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index e32fabb7ebfce47c00f24a40f3347df0c85ab9df..1c774b91de8489e3cf5f5bdb30d5b49e35e68d5a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -111,7 +111,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { msgInner.setTopic(topic); msgInner.setQueueId(0); PutMessageResult putMessageResult = leaderStore.putMessage(msgInner); - Assert.assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, putMessageResult.getPutMessageStatus()); + Assert.assertEquals(PutMessageStatus.OS_PAGECACHE_BUSY, putMessageResult.getPutMessageStatus()); Thread.sleep(1000); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index fb7e3a1b580c9d4a5fa96d6cb812497e617e463d..8cb1eff3ba582c01c0ce4d74498a67614139384e 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -21,6 +21,8 @@ import org.junit.Assert; public class MessageStoreTestBase extends StoreTestBase { protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception { + System.setProperty("dledger.disk.ratio.check", "0.95"); + System.setProperty("dledger.disk.ratio.clean", "0.95"); baseDirs.add(base); MessageStoreConfig storeConfig = new MessageStoreConfig(); storeConfig.setMapedFileSizeCommitLog(1024 * 100);