未验证 提交 bb9f106e 编写于 作者: rushsky518's avatar rushsky518 提交者: GitHub

[ISSUE #1904] Print log when flush timeout (#1903)

* rollback my code

* avoid log when disk flush

* when msg is in next file, flushOK value may be wrong
上级 912cedaf
...@@ -661,14 +661,18 @@ public class CommitLog { ...@@ -661,14 +661,18 @@ public class CommitLog {
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg); CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg); CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) { if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); putMessageResult.setPutMessageStatus(flushStatus);
} }
if (replicaStatus != PutMessageStatus.PUT_OK) { if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus); putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
}
} }
return putMessageResult; return putMessageResult;
}); });
...@@ -762,15 +766,18 @@ public class CommitLog { ...@@ -762,15 +766,18 @@ public class CommitLog {
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch); CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch); CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> { return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) { if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); putMessageResult.setPutMessageStatus(flushStatus);
} }
if (replicaStatus != PutMessageStatus.PUT_OK) { if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus); putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} client address: {}",
messageExtBatch.getTopic(), messageExtBatch.getBornHostNameString());
}
} }
return putMessageResult; return putMessageResult;
}); });
...@@ -900,8 +907,7 @@ public class CommitLog { ...@@ -900,8 +907,7 @@ public class CommitLog {
return putMessageResult; return putMessageResult;
} }
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
MessageExt messageExt) {
// Synchronization flush // Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
...@@ -926,8 +932,7 @@ public class CommitLog { ...@@ -926,8 +932,7 @@ public class CommitLog {
} }
} }
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult, public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService(); HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) { if (messageExt.isWaitStoreMsgOK()) {
...@@ -1420,13 +1425,10 @@ public class CommitLog { ...@@ -1420,13 +1425,10 @@ public class CommitLog {
for (GroupCommitRequest req : this.requestsRead) { for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of // There may be a message in the next file, so a maximum of
// two times the flush // two times the flush
boolean flushOK = false; boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) { for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0); CommitLog.this.mappedFileQueue.flush(0);
} flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
} }
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册