diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index d048dde2b92d3b60035f5764dfd27191417c0ef6..7e86d8419146cf220cef910614f3156efc48fb66 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -42,6 +42,7 @@ public class MessageDecoder { public static final char NAME_VALUE_SEPARATOR = 1; public static final char PROPERTY_SEPARATOR = 2; public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8; + public static final int QUEUE_OFFSET_POSITION = 4 + 4 + 4 + 4 + 4; public static final int SYSFLAG_POSITION = 4 + 4 + 4 + 4 + 4 + 8 + 8; // public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE // + 4 // 2 MAGICCODE diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 400ad786dbecc110fdea1b09c9c12705f07e8ba4..24e0f69c45eaa672dfdc573df8939845a71a33c1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -28,6 +28,8 @@ import io.openmessaging.storage.dledger.store.file.MmapFile; import io.openmessaging.storage.dledger.store.file.MmapFileList; import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; import io.openmessaging.storage.dledger.utils.DLedgerUtils; +import java.net.Inet6Address; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.concurrent.CompletableFuture; @@ -364,21 +366,14 @@ public class DLedgerCommitLog extends CommitLog { return beginTimeInDledgerLock; } - @Override - public PutMessageResult putMessage(final MessageExtBrokerInner msg) { + private void setMessageInfo(MessageExtBrokerInner msg, int tranType) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); - StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); - - String topic = msg.getTopic(); - int queueId = msg.getQueueId(); - //should be consistent with the old version - final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery @@ -387,8 +382,9 @@ public class DLedgerCommitLog extends CommitLog { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } - topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; - queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); + + String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; + int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); @@ -400,6 +396,25 @@ public class DLedgerCommitLog extends CommitLog { } } + InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); + if (bornSocketAddress.getAddress() instanceof Inet6Address) { + msg.setBornHostV6Flag(); + } + + InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); + if (storeSocketAddress.getAddress() instanceof Inet6Address) { + msg.setStoreHostAddressV6Flag(); + } + } + + @Override + public PutMessageResult putMessage(final MessageExtBrokerInner msg) { + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); + String topic = msg.getTopic(); + setMessageInfo(msg,tranType); + // Back to Results AppendMessageResult appendResult; AppendFuture dledgerFuture; @@ -411,14 +426,15 @@ public class DLedgerCommitLog extends CommitLog { try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); encodeResult = this.messageSerializer.serialize(msg); - queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); + queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); + encodeResult.setQueueOffsetKey(queueOffset); if (encodeResult.status != AppendMessageStatus.PUT_OK) { return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); } AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBody(encodeResult.data); + request.setBody(encodeResult.getData()); dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); if (dledgerFuture.getPos() == -1) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); @@ -430,7 +446,7 @@ public class DLedgerCommitLog extends CommitLog { String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: @@ -496,12 +512,104 @@ public class DLedgerCommitLog extends CommitLog { @Override public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { - return CompletableFuture.completedFuture(this.putMessage(msg)); + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); + + setMessageInfo(msg, tranType); + + final String finalTopic = msg.getTopic(); + + // Back to Results + AppendMessageResult appendResult; + AppendFuture dledgerFuture; + EncodeResult encodeResult; + + encodeResult = this.messageSerializer.serialize(msg); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status))); + } + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + long elapsedTimeInLock; + long queueOffset; + try { + beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); + encodeResult.setQueueOffsetKey(queueOffset); + AppendEntryRequest request = new AppendEntryRequest(); + request.setGroup(dLedgerConfig.getGroup()); + request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); + request.setBody(encodeResult.getData()); + dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); + if (dledgerFuture.getPos() == -1) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + } + long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; + + int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; + ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); + + String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + switch (tranType) { + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + // The next update ConsumeQueue information + DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); + break; + default: + break; + } + } catch (Exception e) { + log.error("Put message error", e); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + } finally { + beginTimeInDledgerLock = 0; + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); + } + + return dledgerFuture.thenApply(appendEntryResponse -> { + PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; + switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { + case SUCCESS: + putMessageStatus = PutMessageStatus.PUT_OK; + break; + case INCONSISTENT_LEADER: + case NOT_LEADER: + case LEADER_NOT_READY: + case DISK_FULL: + putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; + break; + case WAIT_QUORUM_ACK_TIMEOUT: + //Do not return flush_slave_timeout to the client, for the ons client will ignore it. + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; + break; + case LEADER_PENDING_FULL: + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; + break; + } + PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); + if (putMessageStatus == PutMessageStatus.PUT_OK) { + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).incrementAndGet(); + storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).addAndGet(appendResult.getWroteBytes()); + } + return putMessageResult; + }); } @Override public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { - return CompletableFuture.completedFuture(putMessages(messageExtBatch)); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); } @Override @@ -566,51 +674,69 @@ public class DLedgerCommitLog extends CommitLog { return diff; } + private long getQueueOffsetByKey(String key, int tranType) { + Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key); + if (null == queueOffset) { + queueOffset = 0L; + DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset); + } + + // Transaction messages that require special handling + switch (tranType) { + // Prepared and Rollback message is not consumed, will not enter the + // consumer queuec + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + queueOffset = 0L; + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + default: + break; + } + return queueOffset; + } + + class EncodeResult { private String queueOffsetKey; - private byte[] data; + private ByteBuffer data; private AppendMessageStatus status; - public EncodeResult(AppendMessageStatus status, byte[] data, String queueOffsetKey) { + public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOffsetKey) { this.data = data; this.status = status; this.queueOffsetKey = queueOffsetKey; } + + public void setQueueOffsetKey(long offset) { + data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset); + } + + public byte[] getData() { + return data.array(); + } } class MessageSerializer { - // File at the end of the minimum fixed length empty - private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; - private final ByteBuffer msgIdMemory; - private final ByteBuffer msgIdV6Memory; - // Store the message content - private final ByteBuffer msgStoreItemMemory; + // The maximum length of the message private final int maxMessageSize; // Build Message Key private final StringBuilder keyBuilder = new StringBuilder(); - private final StringBuilder msgIdBuilder = new StringBuilder(); - -// private final ByteBuffer hostHolder = ByteBuffer.allocate(8); - MessageSerializer(final int size) { - this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8); - this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8); - this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH); this.maxMessageSize = size; } - public ByteBuffer getMsgStoreItemMemory() { - return msgStoreItemMemory; - } - public EncodeResult serialize(final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
// PHY OFFSET long wroteOffset = 0; + long queueOffset = 0; + int sysflag = msgInner.getSysFlag(); int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; @@ -618,33 +744,7 @@ public class DLedgerCommitLog extends CommitLog { ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); - // Record ConsumeQueue information - keyBuilder.setLength(0); - keyBuilder.append(msgInner.getTopic()); - keyBuilder.append('-'); - keyBuilder.append(msgInner.getQueueId()); - String key = keyBuilder.toString(); - - Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key); - if (null == queueOffset) { - queueOffset = 0L; - DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset); - } - - // Transaction messages that require special handling - final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); - switch (tranType) { - // Prepared and Rollback message is not consumed, will not enter the - // consumer queuec - case MessageSysFlag.TRANSACTION_PREPARED_TYPE: - case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: - queueOffset = 0L; - break; - case MessageSysFlag.TRANSACTION_NOT_TYPE: - case MessageSysFlag.TRANSACTION_COMMIT_TYPE: - default: - break; - } + String key = msgInner.getTopic() + "-" + msgInner.getQueueId(); /** * Serialize message @@ -666,6 +766,8 @@ public class DLedgerCommitLog extends CommitLog { final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength); + ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); + // Exceeds the maximum message if (msgLen > this.maxMessageSize) { DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength @@ -675,60 +777,56 @@ public class DLedgerCommitLog extends CommitLog { // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE - this.msgStoreItemMemory.putInt(msgLen); + msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE - this.msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE); + msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC - this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); + msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID - this.msgStoreItemMemory.putInt(msgInner.getQueueId()); + msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG - this.msgStoreItemMemory.putInt(msgInner.getFlag()); + msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET - this.msgStoreItemMemory.putLong(queueOffset); + msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET - this.msgStoreItemMemory.putLong(wroteOffset); + msgStoreItemMemory.putLong(wroteOffset); // 8 SYSFLAG - this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); + msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP - this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); + msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST - this.resetByteBuffer(bornHostHolder, bornHostLength); - this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); + resetByteBuffer(bornHostHolder, bornHostLength); + msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP - this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); + msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS - this.resetByteBuffer(storeHostHolder, storeHostLength); - this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); + resetByteBuffer(storeHostHolder, storeHostLength); + msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES - this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); + msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset - this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); + msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY - this.msgStoreItemMemory.putInt(bodyLength); + msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) { - this.msgStoreItemMemory.put(msgInner.getBody()); + msgStoreItemMemory.put(msgInner.getBody()); } // 16 TOPIC - this.msgStoreItemMemory.put((byte) topicLength); - this.msgStoreItemMemory.put(topicData); + msgStoreItemMemory.put((byte) topicLength); + msgStoreItemMemory.put(topicData); // 17 PROPERTIES - this.msgStoreItemMemory.putShort((short) propertiesLength); + msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) { - this.msgStoreItemMemory.put(propertiesData); + msgStoreItemMemory.put(propertiesData); } - byte[] data = new byte[msgLen]; - this.msgStoreItemMemory.clear(); - this.msgStoreItemMemory.get(data); - return new EncodeResult(AppendMessageStatus.PUT_OK, data, key); + return new EncodeResult(AppendMessageStatus.PUT_OK, msgStoreItemMemory, key); } private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); } - } public static class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult { diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index f0b9205302e965a832393d788d122e64bbb5d0aa..e31d834e529d796be4a54f0324e49aa85d36efc2 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -23,6 +23,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.DefaultMessageStore; @@ -175,6 +177,48 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { messageStore.shutdown(); } + @Test + public void testAsyncPutAndGetMessage() throws Exception { + String base = createBaseDir(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + Thread.sleep(1000); + String topic = UUID.randomUUID().toString(); + + List results = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + MessageExtBrokerInner msgInner = + i < 5 ? buildMessage() : buildIPv6HostMessage(); + msgInner.setTopic(topic); + msgInner.setQueueId(0); + CompletableFuture futureResult = messageStore.asyncPutMessage(msgInner); + PutMessageResult putMessageResult = futureResult.get(3000, TimeUnit.MILLISECONDS); + results.add(putMessageResult); + Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset()); + } + Thread.sleep(100); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null); + Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); + + Assert.assertEquals(10, getMessageResult.getMessageBufferList().size()); + Assert.assertEquals(10, getMessageResult.getMessageMapedList().size()); + + for (int i = 0; i < results.size(); i++) { + ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i); + MessageExt messageExt = MessageDecoder.decode(buffer); + Assert.assertEquals(i, messageExt.getQueueOffset()); + Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId()); + Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset()); + } + messageStore.destroy(); + messageStore.shutdown(); + } + @Test public void testCommittedPos() throws Exception {