diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index d489e84371e4ab3372a74b7718520df552f3536a..cce6481b8daa119def971b3c4c2f95d4b3e6502a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -661,14 +661,18 @@ public class CommitLog { storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); - CompletableFuture flushResultFuture = submitFlushRequest(result, putMessageResult, msg); - CompletableFuture replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg); + CompletableFuture flushResultFuture = submitFlushRequest(result, msg); + CompletableFuture replicaResultFuture = submitReplicaRequest(result, msg); return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { - putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); + putMessageResult.setPutMessageStatus(flushStatus); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); + if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) { + log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}", + msg.getTopic(), msg.getTags(), msg.getBornHostNameString()); + } } return putMessageResult; }); @@ -762,15 +766,18 @@ public class CommitLog { storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); - CompletableFuture flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch); - CompletableFuture replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch); + CompletableFuture flushOKFuture = submitFlushRequest(result, messageExtBatch); + CompletableFuture replicaOKFuture = submitReplicaRequest(result, messageExtBatch); return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { - putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); + putMessageResult.setPutMessageStatus(flushStatus); } - if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); + if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) { + log.error("do sync transfer other node, wait return, but failed, topic: {} client address: {}", + messageExtBatch.getTopic(), messageExtBatch.getBornHostNameString()); + } } return putMessageResult; }); @@ -900,8 +907,7 @@ public class CommitLog { return putMessageResult; } - public CompletableFuture submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, - MessageExt messageExt) { + public CompletableFuture submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; @@ -926,8 +932,7 @@ public class CommitLog { } } - public CompletableFuture submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult, - MessageExt messageExt) { + public CompletableFuture submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { @@ -1420,13 +1425,10 @@ public class CommitLog { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush - boolean flushOK = false; + boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); for (int i = 0; i < 2 && !flushOK; i++) { + CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); - - if (!flushOK) { - CommitLog.this.mappedFileQueue.flush(0); - } } req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);