From 0a9822e5f4667654b37bd480865174d1bb2c28fe Mon Sep 17 00:00:00 2001 From: Heng Du Date: Wed, 1 Apr 2020 15:25:55 +0800 Subject: [PATCH] [ISSUE #1895] Expose the flush disk timeout error --- .../main/java/org/apache/rocketmq/store/CommitLog.java | 9 +++------ .../java/org/apache/rocketmq/store/ha/HAService.java | 3 ++- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 82bd6706..651fa56d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1383,11 +1383,8 @@ public class CommitLog { return nextOffset; } - public void wakeupCustomer(final boolean flushOK) { - long endTimestamp = System.currentTimeMillis(); - PutMessageStatus result = (flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis)) ? - PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT; - this.flushOKFuture.complete(result); + public void wakeupCustomer(final PutMessageStatus putMessageStatus) { + this.flushOKFuture.complete(putMessageStatus); } public CompletableFuture future() { @@ -1433,7 +1430,7 @@ public class CommitLog { } } - req.wakeupCustomer(flushOK); + req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index 3035c575..d3220d09 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -39,6 +39,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.PutMessageStatus; public class HAService { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -291,7 +292,7 @@ public class HAService { log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); } - req.wakeupCustomer(transferOK); + req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } this.requestsRead.clear(); -- GitLab