diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 1db019bec810accdf844597dc2783910f05c1e1c..b95bab62490b51041d2afdc357362d8141db590d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -180,8 +180,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore { } @Override - public boolean appendToCommitLog(long startOffset, byte[] data) { - return next.appendToCommitLog(startOffset, data); + public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) { + return next.appendToCommitLog(startOffset, data, dataStart, dataLength); } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 43b01f08d8e2a6f3f0418e7ef996a428535ecc4c..57fa3637856380a2bb03767d165ad811880bea68 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1185,7 +1185,7 @@ public class CommitLog { this.mappedFileQueue.destroy(); } - public boolean appendData(long startOffset, byte[] data) { + public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) { putMessageLock.lock(); try { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); @@ -1194,7 +1194,7 @@ public class CommitLog { return false; } - return mappedFile.appendMessage(data); + return mappedFile.appendMessage(data, dataStart, dataLength); } finally { putMessageLock.unlock(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index b8ecdee8cc63141876bf6b8f0b2efb3f811479b1..7dd5a32b2fab0d873736479ddde8f9c402e3ec0c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -921,13 +921,13 @@ public class DefaultMessageStore implements MessageStore { } @Override - public boolean appendToCommitLog(long startOffset, byte[] data) { + public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) { if (this.shutdown) { log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); return false; } - boolean result = this.commitLog.appendData(startOffset, data); + boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength); if (result) { this.reputMessageService.wakeup(); } else { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 64eb5250de6e59f7ce3ef19033cd40b5d5f0254f..a8c658bfe2635eeaa847faf94f7171d9a8432901 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -245,9 +245,11 @@ public interface MessageStore { * * @param startOffset starting offset. * @param data data to append. + * @param dataStart the start index of data array + * @param dataLength the length of data array * @return true if success; false otherwise. */ - boolean appendToCommitLog(final long startOffset, final byte[] data); + boolean appendToCommitLog(final long startOffset, final byte[] data, int dataStart, int dataLength); /** * Execute file deletion manually. diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 3b9876030d7663dd3414f41fe4d71d006b6c41e4..ea791bd9ff66dcc3bf32f4b0c4f6e65aec3c12f9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -905,7 +905,7 @@ public class DLedgerCommitLog extends CommitLog { } @Override - public boolean appendData(long startOffset, byte[] data) { + public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) { //the old ha service will invoke method, here to prevent it return false; } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index d4d410996789d6de64d7b66fcf3804b25af3ba41..845935bb9dd4212625b2a4399c93d0db4d6952aa 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -440,7 +440,6 @@ public class HAService { private boolean dispatchReadRequest() { final int msgHeaderSize = 8 + 4; // phyoffset + size - int readSocketPos = this.byteBufferRead.position(); while (true) { int diff = this.byteBufferRead.position() - this.dispatchPosition; @@ -459,13 +458,12 @@ public class HAService { } if (diff >= (msgHeaderSize + bodySize)) { - byte[] bodyData = new byte[bodySize]; - this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize); - this.byteBufferRead.get(bodyData); + byte[] bodyData = byteBufferRead.array(); + int dataStart = this.dispatchPosition + msgHeaderSize; - HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); + HAService.this.defaultMessageStore.appendToCommitLog( + masterPhyOffset, bodyData, dataStart, bodySize); - this.byteBufferRead.position(readSocketPos); this.dispatchPosition += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) {