未验证 提交 0a9822e5 编写于 作者: H Heng Du 提交者: GitHub

[ISSUE #1895] Expose the flush disk timeout error

上级 18a879b5
......@@ -1383,11 +1383,8 @@ public class CommitLog {
return nextOffset;
}
public void wakeupCustomer(final boolean flushOK) {
long endTimestamp = System.currentTimeMillis();
PutMessageStatus result = (flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis)) ?
PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
this.flushOKFuture.complete(result);
public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
this.flushOKFuture.complete(putMessageStatus);
}
public CompletableFuture<PutMessageStatus> future() {
......@@ -1433,7 +1430,7 @@ public class CommitLog {
}
}
req.wakeupCustomer(flushOK);
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
......
......@@ -39,6 +39,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageStatus;
public class HAService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
......@@ -291,7 +292,7 @@ public class HAService {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK);
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead.clear();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册