From 3183122c01cc2d0dc005634d028ccbcb62495bf6 Mon Sep 17 00:00:00 2001 From: huangli Date: Tue, 6 Jul 2021 21:47:12 +0800 Subject: [PATCH] Eliminate array copy (#2886) [Part C] Improve produce performance in M/S mode. --- .../broker/plugin/AbstractPluginMessageStore.java | 4 ++-- .../main/java/org/apache/rocketmq/store/CommitLog.java | 4 ++-- .../org/apache/rocketmq/store/DefaultMessageStore.java | 4 ++-- .../java/org/apache/rocketmq/store/MessageStore.java | 4 +++- .../rocketmq/store/dledger/DLedgerCommitLog.java | 2 +- .../java/org/apache/rocketmq/store/ha/HAService.java | 10 ++++------ 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 1db019be..b95bab62 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -180,8 +180,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore { } @Override - public boolean appendToCommitLog(long startOffset, byte[] data) { - return next.appendToCommitLog(startOffset, data); + public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) { + return next.appendToCommitLog(startOffset, data, dataStart, dataLength); } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 43b01f08..57fa3637 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1185,7 +1185,7 @@ public class CommitLog { this.mappedFileQueue.destroy(); } - public boolean appendData(long startOffset, byte[] data) { + public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) { putMessageLock.lock(); try { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); @@ -1194,7 +1194,7 @@ public class CommitLog { return false; } - return mappedFile.appendMessage(data); + return mappedFile.appendMessage(data, dataStart, dataLength); } finally { putMessageLock.unlock(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index b8ecdee8..7dd5a32b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -921,13 +921,13 @@ public class DefaultMessageStore implements MessageStore { } @Override - public boolean appendToCommitLog(long startOffset, byte[] data) { + public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) { if (this.shutdown) { log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); return false; } - boolean result = this.commitLog.appendData(startOffset, data); + boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength); if (result) { this.reputMessageService.wakeup(); } else { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 64eb5250..a8c658bf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -245,9 +245,11 @@ public interface MessageStore { * * @param startOffset starting offset. * @param data data to append. + * @param dataStart the start index of data array + * @param dataLength the length of data array * @return true if success; false otherwise. */ - boolean appendToCommitLog(final long startOffset, final byte[] data); + boolean appendToCommitLog(final long startOffset, final byte[] data, int dataStart, int dataLength); /** * Execute file deletion manually. diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 3b987603..ea791bd9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -905,7 +905,7 @@ public class DLedgerCommitLog extends CommitLog { } @Override - public boolean appendData(long startOffset, byte[] data) { + public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) { //the old ha service will invoke method, here to prevent it return false; } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index d4d41099..845935bb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -440,7 +440,6 @@ public class HAService { private boolean dispatchReadRequest() { final int msgHeaderSize = 8 + 4; // phyoffset + size - int readSocketPos = this.byteBufferRead.position(); while (true) { int diff = this.byteBufferRead.position() - this.dispatchPosition; @@ -459,13 +458,12 @@ public class HAService { } if (diff >= (msgHeaderSize + bodySize)) { - byte[] bodyData = new byte[bodySize]; - this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize); - this.byteBufferRead.get(bodyData); + byte[] bodyData = byteBufferRead.array(); + int dataStart = this.dispatchPosition + msgHeaderSize; - HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); + HAService.this.defaultMessageStore.appendToCommitLog( + masterPhyOffset, bodyData, dataStart, bodySize); - this.byteBufferRead.position(readSocketPos); this.dispatchPosition += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) { -- GitLab