提交 415b38bd 编写于 作者: D dongeforever

Do not return flush_slave_timeout to client

上级 235ac796
...@@ -27,6 +27,7 @@ import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; ...@@ -27,6 +27,7 @@ import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFile; import io.openmessaging.storage.dledger.store.file.MmapFile;
import io.openmessaging.storage.dledger.store.file.MmapFileList; import io.openmessaging.storage.dledger.store.file.MmapFileList;
import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -190,7 +191,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -190,7 +191,7 @@ public class DLedgerCommitLog extends CommitLog {
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
while (!mappedFile.destroy(10 * 1000)) { while (!mappedFile.destroy(10 * 1000)) {
io.openmessaging.storage.dledger.utils.UtilAll.sleep(1000); DLedgerUtils.sleep(1000);
} }
mappedFileQueue.getMappedFiles().remove(mappedFile); mappedFileQueue.getMappedFiles().remove(mappedFile);
} }
...@@ -397,7 +398,6 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -397,7 +398,6 @@ public class DLedgerCommitLog extends CommitLog {
} }
// Back to Results // Back to Results
PutMessageStatus putMessageStatus = null;
AppendMessageResult appendResult; AppendMessageResult appendResult;
AppendFuture<AppendEntryResponse> dledgerFuture; AppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult; EncodeResult encodeResult;
...@@ -449,6 +449,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -449,6 +449,7 @@ public class DLedgerCommitLog extends CommitLog {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, appendResult); log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, appendResult);
} }
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
try { try {
AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
...@@ -462,14 +463,15 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -462,14 +463,15 @@ public class DLedgerCommitLog extends CommitLog {
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break; break;
case WAIT_QUORUM_ACK_TIMEOUT: case WAIT_QUORUM_ACK_TIMEOUT:
putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT; //Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break; break;
case LEADER_PENDING_FULL: case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break; break;
} }
} catch (Throwable ignored) { } catch (Throwable t) {
putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT; log.error("Failed to get dledger append result", t);
} }
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
......
...@@ -111,7 +111,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -111,7 +111,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
msgInner.setTopic(topic); msgInner.setTopic(topic);
msgInner.setQueueId(0); msgInner.setQueueId(0);
PutMessageResult putMessageResult = leaderStore.putMessage(msgInner); PutMessageResult putMessageResult = leaderStore.putMessage(msgInner);
Assert.assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, putMessageResult.getPutMessageStatus()); Assert.assertEquals(PutMessageStatus.OS_PAGECACHE_BUSY, putMessageResult.getPutMessageStatus());
Thread.sleep(1000); Thread.sleep(1000);
......
...@@ -21,6 +21,8 @@ import org.junit.Assert; ...@@ -21,6 +21,8 @@ import org.junit.Assert;
public class MessageStoreTestBase extends StoreTestBase { public class MessageStoreTestBase extends StoreTestBase {
protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception { protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort) throws Exception {
System.setProperty("dledger.disk.ratio.check", "0.95");
System.setProperty("dledger.disk.ratio.clean", "0.95");
baseDirs.add(base); baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig(); MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100); storeConfig.setMapedFileSizeCommitLog(1024 * 100);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册