diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 9a6e7a78a128fc605ea37ed69025686f452b23d5..9241ffe4238cece8af9f58a5be6e54c662ccd524 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -426,17 +426,18 @@ public class DLedgerCommitLog extends CommitLog { AppendFuture dledgerFuture; EncodeResult encodeResult; + encodeResult = this.messageSerializer.serialize(msg); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); + } + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config long elapsedTimeInLock; long queueOffset; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - encodeResult = this.messageSerializer.serialize(msg); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); - } + encodeResult.setQueueOffsetKey(queueOffset, false); AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); @@ -542,6 +543,12 @@ public class DLedgerCommitLog extends CommitLog { BatchAppendFuture dledgerFuture; EncodeResult encodeResult; + encodeResult = this.messageSerializer.serialize(messageExtBatch); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult + .status)); + } + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config msgIdBuilder.setLength(0); long elapsedTimeInLock; @@ -549,12 +556,8 @@ public class DLedgerCommitLog extends CommitLog { long msgNum = 0; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - encodeResult = this.messageSerializer.serialize(messageExtBatch); - queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult - .status)); - } + queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); + encodeResult.setQueueOffsetKey(queueOffset, true); BatchAppendEntryRequest request = new BatchAppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); @@ -664,7 +667,7 @@ public class DLedgerCommitLog extends CommitLog { try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset); + encodeResult.setQueueOffsetKey(queueOffset, false); AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); @@ -779,7 +782,8 @@ public class DLedgerCommitLog extends CommitLog { long msgNum = 0; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); + queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); + encodeResult.setQueueOffsetKey(queueOffset, true); BatchAppendEntryRequest request = new BatchAppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); @@ -957,8 +961,15 @@ public class DLedgerCommitLog extends CommitLog { this.queueOffsetKey = queueOffsetKey; } - public void setQueueOffsetKey(long offset) { - data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset); + public void setQueueOffsetKey(long offset, boolean isBatch) { + if (!isBatch) { + this.data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset); + return; + } + + for (byte[] data : batchData) { + ByteBuffer.wrap(data).putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset++); + } } public byte[] getData() { @@ -977,8 +988,6 @@ public class DLedgerCommitLog extends CommitLog { // The maximum length of the message private final int maxMessageSize; - // Build Message Key - private final StringBuilder keyBuilder = new StringBuilder(); MessageSerializer(final int size) { this.maxMessageSize = size; @@ -1079,17 +1088,7 @@ public class DLedgerCommitLog extends CommitLog { } public EncodeResult serialize(final MessageExtBatch messageExtBatch) { - keyBuilder.setLength(0); - keyBuilder.append(messageExtBatch.getTopic()); - keyBuilder.append('-'); - keyBuilder.append(messageExtBatch.getQueueId()); - String key = keyBuilder.toString(); - - Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key); - if (null == queueOffset) { - queueOffset = 0L; - DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset); - } + String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId(); int totalMsgLen = 0; ByteBuffer messagesByteBuff = messageExtBatch.wrap(); @@ -1154,7 +1153,7 @@ public class DLedgerCommitLog extends CommitLog { // 5 FLAG msgStoreItemMemory.putInt(flag); // 6 QUEUEOFFSET - msgStoreItemMemory.putLong(queueOffset++); + msgStoreItemMemory.putLong(0L); // 7 PHYSICALOFFSET msgStoreItemMemory.putLong(0); // 8 SYSFLAG @@ -1210,6 +1209,7 @@ public class DLedgerCommitLog extends CommitLog { this.sbr = sbr; } + @Override public synchronized void release() { super.release(); if (sbr != null) {