diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 9241ffe4238cece8af9f58a5be6e54c662ccd524..9a6e7a78a128fc605ea37ed69025686f452b23d5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -426,18 +426,17 @@ public class DLedgerCommitLog extends CommitLog { AppendFuture dledgerFuture; EncodeResult encodeResult; - encodeResult = this.messageSerializer.serialize(msg); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); - } - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config long elapsedTimeInLock; long queueOffset; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + encodeResult = this.messageSerializer.serialize(msg); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset, false); + encodeResult.setQueueOffsetKey(queueOffset); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); + } AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); @@ -543,12 +542,6 @@ public class DLedgerCommitLog extends CommitLog { BatchAppendFuture dledgerFuture; EncodeResult encodeResult; - encodeResult = this.messageSerializer.serialize(messageExtBatch); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult - .status)); - } - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config msgIdBuilder.setLength(0); long elapsedTimeInLock; @@ -556,8 +549,12 @@ public class DLedgerCommitLog extends CommitLog { long msgNum = 0; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset, true); + encodeResult = this.messageSerializer.serialize(messageExtBatch); + queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult + .status)); + } BatchAppendEntryRequest request = new BatchAppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); @@ -667,7 +664,7 @@ public class DLedgerCommitLog extends CommitLog { try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset, false); + encodeResult.setQueueOffsetKey(queueOffset); AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); @@ -782,8 +779,7 @@ public class DLedgerCommitLog extends CommitLog { long msgNum = 0; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset, true); + queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); BatchAppendEntryRequest request = new BatchAppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); @@ -961,15 +957,8 @@ public class DLedgerCommitLog extends CommitLog { this.queueOffsetKey = queueOffsetKey; } - public void setQueueOffsetKey(long offset, boolean isBatch) { - if (!isBatch) { - this.data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset); - return; - } - - for (byte[] data : batchData) { - ByteBuffer.wrap(data).putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset++); - } + public void setQueueOffsetKey(long offset) { + data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset); } public byte[] getData() { @@ -988,6 +977,8 @@ public class DLedgerCommitLog extends CommitLog { // The maximum length of the message private final int maxMessageSize; + // Build Message Key + private final StringBuilder keyBuilder = new StringBuilder(); MessageSerializer(final int size) { this.maxMessageSize = size; @@ -1088,7 +1079,17 @@ public class DLedgerCommitLog extends CommitLog { } public EncodeResult serialize(final MessageExtBatch messageExtBatch) { - String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId(); + keyBuilder.setLength(0); + keyBuilder.append(messageExtBatch.getTopic()); + keyBuilder.append('-'); + keyBuilder.append(messageExtBatch.getQueueId()); + String key = keyBuilder.toString(); + + Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key); + if (null == queueOffset) { + queueOffset = 0L; + DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset); + } int totalMsgLen = 0; ByteBuffer messagesByteBuff = messageExtBatch.wrap(); @@ -1153,7 +1154,7 @@ public class DLedgerCommitLog extends CommitLog { // 5 FLAG msgStoreItemMemory.putInt(flag); // 6 QUEUEOFFSET - msgStoreItemMemory.putLong(0L); + msgStoreItemMemory.putLong(queueOffset++); // 7 PHYSICALOFFSET msgStoreItemMemory.putLong(0); // 8 SYSFLAG @@ -1209,7 +1210,6 @@ public class DLedgerCommitLog extends CommitLog { this.sbr = sbr; } - @Override public synchronized void release() { super.release(); if (sbr != null) {