未验证 提交 3183122c 编写于 作者: H huangli 提交者: GitHub

Eliminate array copy (#2886)

[Part C] Improve produce performance in M/S mode.
上级 d65778f3
...@@ -180,8 +180,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore { ...@@ -180,8 +180,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
} }
@Override @Override
public boolean appendToCommitLog(long startOffset, byte[] data) { public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
return next.appendToCommitLog(startOffset, data); return next.appendToCommitLog(startOffset, data, dataStart, dataLength);
} }
@Override @Override
......
...@@ -1185,7 +1185,7 @@ public class CommitLog { ...@@ -1185,7 +1185,7 @@ public class CommitLog {
this.mappedFileQueue.destroy(); this.mappedFileQueue.destroy();
} }
public boolean appendData(long startOffset, byte[] data) { public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
putMessageLock.lock(); putMessageLock.lock();
try { try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
...@@ -1194,7 +1194,7 @@ public class CommitLog { ...@@ -1194,7 +1194,7 @@ public class CommitLog {
return false; return false;
} }
return mappedFile.appendMessage(data); return mappedFile.appendMessage(data, dataStart, dataLength);
} finally { } finally {
putMessageLock.unlock(); putMessageLock.unlock();
} }
......
...@@ -921,13 +921,13 @@ public class DefaultMessageStore implements MessageStore { ...@@ -921,13 +921,13 @@ public class DefaultMessageStore implements MessageStore {
} }
@Override @Override
public boolean appendToCommitLog(long startOffset, byte[] data) { public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
if (this.shutdown) { if (this.shutdown) {
log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
return false; return false;
} }
boolean result = this.commitLog.appendData(startOffset, data); boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength);
if (result) { if (result) {
this.reputMessageService.wakeup(); this.reputMessageService.wakeup();
} else { } else {
......
...@@ -245,9 +245,11 @@ public interface MessageStore { ...@@ -245,9 +245,11 @@ public interface MessageStore {
* *
* @param startOffset starting offset. * @param startOffset starting offset.
* @param data data to append. * @param data data to append.
* @param dataStart the start index of data array
* @param dataLength the length of data array
* @return true if success; false otherwise. * @return true if success; false otherwise.
*/ */
boolean appendToCommitLog(final long startOffset, final byte[] data); boolean appendToCommitLog(final long startOffset, final byte[] data, int dataStart, int dataLength);
/** /**
* Execute file deletion manually. * Execute file deletion manually.
......
...@@ -905,7 +905,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -905,7 +905,7 @@ public class DLedgerCommitLog extends CommitLog {
} }
@Override @Override
public boolean appendData(long startOffset, byte[] data) { public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
//the old ha service will invoke method, here to prevent it //the old ha service will invoke method, here to prevent it
return false; return false;
} }
......
...@@ -440,7 +440,6 @@ public class HAService { ...@@ -440,7 +440,6 @@ public class HAService {
private boolean dispatchReadRequest() { private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) { while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition; int diff = this.byteBufferRead.position() - this.dispatchPosition;
...@@ -459,13 +458,12 @@ public class HAService { ...@@ -459,13 +458,12 @@ public class HAService {
} }
if (diff >= (msgHeaderSize + bodySize)) { if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize]; byte[] bodyData = byteBufferRead.array();
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize); int dataStart = this.dispatchPosition + msgHeaderSize;
this.byteBufferRead.get(bodyData);
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);
this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize; this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) { if (!reportSlaveMaxOffsetPlus()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册