diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 90fd6f3f169871fa1d6a226b08a6d0a6a8a901f4..3b9876030d7663dd3414f41fe4d71d006b6c41e4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -562,11 +562,13 @@ public class DLedgerCommitLog extends CommitLog { request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setBatchMsgs(encodeResult.batchData); - dledgerFuture = (BatchAppendFuture) dLedgerServer.handleAppend(request); - if (dledgerFuture.getPos() == -1) { - log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode()); + AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request); + if (appendFuture.getPos() == -1) { + log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode()); return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); } + dledgerFuture = (BatchAppendFuture) appendFuture; + long wroteOffset = 0; int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; @@ -789,11 +791,13 @@ public class DLedgerCommitLog extends CommitLog { request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setBatchMsgs(encodeResult.batchData); - dledgerFuture = (BatchAppendFuture) dLedgerServer.handleAppend(request); - if (dledgerFuture.getPos() == -1) { - log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode()); + AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request); + if (appendFuture.getPos() == -1) { + log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode()); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } + dledgerFuture = (BatchAppendFuture) appendFuture; + long wroteOffset = 0; int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;