From 4730987c9cc046c6db980a48aaed9eacc13c630d Mon Sep 17 00:00:00 2001 From: TerrellChen <39452477+TerrellChen@users.noreply.github.com> Date: Tue, 8 Dec 2020 16:58:59 +0800 Subject: [PATCH] [ISSUE #690] Support batch msgs in dledger mode (#2406) * implement issue-690 * add unit test * fix version * fix wroteOffset;update version;polish * polish * fix wrong wroteOffset of AppendMessageResult * move serialization out of lock in async method --- store/pom.xml | 2 +- .../store/dledger/DLedgerCommitLog.java | 376 +++++++++++++++++- .../apache/rocketmq/store/StoreTestBase.java | 68 +++- .../store/dledger/DLedgerCommitlogTest.java | 122 +++++- 4 files changed, 544 insertions(+), 24 deletions(-) diff --git a/store/pom.xml b/store/pom.xml index 8f4b44a2..bcb3e6a5 100644 --- a/store/pom.xml +++ b/store/pom.xml @@ -31,7 +31,7 @@ io.openmessaging.storage dledger - 0.2.0 + 0.2.2 org.apache.rocketmq diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 24e0f69c..9a6e7a78 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -17,11 +17,13 @@ package org.apache.rocketmq.store.dledger; import io.openmessaging.storage.dledger.AppendFuture; +import io.openmessaging.storage.dledger.BatchAppendFuture; import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.entry.DLedgerEntry; import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest; import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; import io.openmessaging.storage.dledger.store.file.MmapFile; @@ -32,6 +34,8 @@ import java.net.Inet6Address; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; @@ -74,6 +78,8 @@ public class DLedgerCommitLog extends CommitLog { private boolean isInrecoveringOldCommitlog = false; + private final StringBuilder msgIdBuilder = new StringBuilder(); + public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { super(defaultMessageStore); dLedgerConfig = new DLedgerConfig(); @@ -507,7 +513,129 @@ public class DLedgerCommitLog extends CommitLog { @Override public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); + + if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + if (messageExtBatch.getDelayTimeLevel() > 0) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + // Set the storage time + messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); + if (bornSocketAddress.getAddress() instanceof Inet6Address) { + messageExtBatch.setBornHostV6Flag(); + } + + InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); + if (storeSocketAddress.getAddress() instanceof Inet6Address) { + messageExtBatch.setStoreHostAddressV6Flag(); + } + + // Back to Results + AppendMessageResult appendResult; + BatchAppendFuture dledgerFuture; + EncodeResult encodeResult; + + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + msgIdBuilder.setLength(0); + long elapsedTimeInLock; + long queueOffset; + long msgNum = 0; + try { + beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + encodeResult = this.messageSerializer.serialize(messageExtBatch); + queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult + .status)); + } + BatchAppendEntryRequest request = new BatchAppendEntryRequest(); + request.setGroup(dLedgerConfig.getGroup()); + request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); + request.setBatchMsgs(encodeResult.batchData); + dledgerFuture = (BatchAppendFuture) dLedgerServer.handleAppend(request); + if (dledgerFuture.getPos() == -1) { + log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode()); + return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); + } + long wroteOffset = 0; + + int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; + ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); + + boolean isFirstOffset = true; + long firstWroteOffset = 0; + for (long pos : dledgerFuture.getPositions()) { + wroteOffset = pos + DLedgerEntry.BODY_OFFSET; + if (isFirstOffset) { + firstWroteOffset = wroteOffset; + isFirstOffset = false; + } + String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); + if (msgIdBuilder.length() > 0) { + msgIdBuilder.append(',').append(msgId); + } else { + msgIdBuilder.append(msgId); + } + msgNum++; + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, + msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum); + } catch (Exception e) { + log.error("Put message error", e); + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus + .UNKNOWN_ERROR)); + } finally { + beginTimeInDledgerLock = 0; + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", + elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); + } + + PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; + try { + AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); + switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { + case SUCCESS: + putMessageStatus = PutMessageStatus.PUT_OK; + break; + case INCONSISTENT_LEADER: + case NOT_LEADER: + case LEADER_NOT_READY: + case DISK_FULL: + putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; + break; + case WAIT_QUORUM_ACK_TIMEOUT: + //Do not return flush_slave_timeout to the client, for the ons client will ignore it. + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; + break; + case LEADER_PENDING_FULL: + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; + break; + } + } catch (Throwable t) { + log.error("Failed to get dledger append result", t); + } + + PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); + if (putMessageStatus == PutMessageStatus.PUT_OK) { + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum); + storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen); + } + return putMessageResult; } @Override @@ -609,7 +737,125 @@ public class DLedgerCommitLog extends CommitLog { @Override public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); + + if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + } + if (messageExtBatch.getDelayTimeLevel() > 0) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + } + + // Set the storage time + messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); + if (bornSocketAddress.getAddress() instanceof Inet6Address) { + messageExtBatch.setBornHostV6Flag(); + } + + InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); + if (storeSocketAddress.getAddress() instanceof Inet6Address) { + messageExtBatch.setStoreHostAddressV6Flag(); + } + + // Back to Results + AppendMessageResult appendResult; + BatchAppendFuture dledgerFuture; + EncodeResult encodeResult; + + encodeResult = this.messageSerializer.serialize(messageExtBatch); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult + .status))); + } + + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + msgIdBuilder.setLength(0); + long elapsedTimeInLock; + long queueOffset; + long msgNum = 0; + try { + beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); + BatchAppendEntryRequest request = new BatchAppendEntryRequest(); + request.setGroup(dLedgerConfig.getGroup()); + request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); + request.setBatchMsgs(encodeResult.batchData); + dledgerFuture = (BatchAppendFuture) dLedgerServer.handleAppend(request); + if (dledgerFuture.getPos() == -1) { + log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + } + long wroteOffset = 0; + + int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; + ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); + + boolean isFirstOffset = true; + long firstWroteOffset = 0; + for (long pos : dledgerFuture.getPositions()) { + wroteOffset = pos + DLedgerEntry.BODY_OFFSET; + if (isFirstOffset) { + firstWroteOffset = wroteOffset; + isFirstOffset = false; + } + String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); + if (msgIdBuilder.length() > 0) { + msgIdBuilder.append(',').append(msgId); + } else { + msgIdBuilder.append(msgId); + } + msgNum++; + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, + msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum); + } catch (Exception e) { + log.error("Put message error", e); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + } finally { + beginTimeInDledgerLock = 0; + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", + elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); + } + + return dledgerFuture.thenApply(appendEntryResponse -> { + PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; + switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { + case SUCCESS: + putMessageStatus = PutMessageStatus.PUT_OK; + break; + case INCONSISTENT_LEADER: + case NOT_LEADER: + case LEADER_NOT_READY: + case DISK_FULL: + putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; + break; + case WAIT_QUORUM_ACK_TIMEOUT: + //Do not return flush_slave_timeout to the client, for the ons client will ignore it. + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; + break; + case LEADER_PENDING_FULL: + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; + break; + } + PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); + if (putMessageStatus == PutMessageStatus.PUT_OK) { + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).incrementAndGet(); + storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(appendResult.getWroteBytes()); + } + return putMessageResult; + }); } @Override @@ -701,7 +947,9 @@ public class DLedgerCommitLog extends CommitLog { class EncodeResult { private String queueOffsetKey; private ByteBuffer data; + private List batchData; private AppendMessageStatus status; + private int totalMsgLen; public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOffsetKey) { this.data = data; @@ -716,6 +964,13 @@ public class DLedgerCommitLog extends CommitLog { public byte[] getData() { return data.array(); } + + public EncodeResult(AppendMessageStatus status, String queueOffsetKey, List batchData, int totalMsgLen) { + this.batchData = batchData; + this.status = status; + this.queueOffsetKey = queueOffsetKey; + this.totalMsgLen = totalMsgLen; + } } class MessageSerializer { @@ -823,6 +1078,123 @@ public class DLedgerCommitLog extends CommitLog { return new EncodeResult(AppendMessageStatus.PUT_OK, msgStoreItemMemory, key); } + public EncodeResult serialize(final MessageExtBatch messageExtBatch) { + keyBuilder.setLength(0); + keyBuilder.append(messageExtBatch.getTopic()); + keyBuilder.append('-'); + keyBuilder.append(messageExtBatch.getQueueId()); + String key = keyBuilder.toString(); + + Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key); + if (null == queueOffset) { + queueOffset = 0L; + DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset); + } + + int totalMsgLen = 0; + ByteBuffer messagesByteBuff = messageExtBatch.wrap(); + List batchBody = new LinkedList<>(); + + int sysFlag = messageExtBatch.getSysFlag(); + int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; + int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; + ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); + ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); + + while (messagesByteBuff.hasRemaining()) { + // 1 TOTALSIZE + messagesByteBuff.getInt(); + // 2 MAGICCODE + messagesByteBuff.getInt(); + // 3 BODYCRC + messagesByteBuff.getInt(); + // 4 FLAG + int flag = messagesByteBuff.getInt(); + // 5 BODY + int bodyLen = messagesByteBuff.getInt(); + int bodyPos = messagesByteBuff.position(); + int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen); + messagesByteBuff.position(bodyPos + bodyLen); + // 6 properties + short propertiesLen = messagesByteBuff.getShort(); + int propertiesPos = messagesByteBuff.position(); + messagesByteBuff.position(propertiesPos + propertiesLen); + + final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + + final int topicLength = topicData.length; + + final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen); + ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); + + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + + bodyLen + + ", maxMessageSize: " + this.maxMessageSize); + throw new RuntimeException("message size exceeded"); + } + + totalMsgLen += msgLen; + // Determines whether there is sufficient free space + if (totalMsgLen > maxMessageSize) { + throw new RuntimeException("message size exceeded"); + } + + // Initialization of storage space + this.resetByteBuffer(msgStoreItemMemory, msgLen); + // 1 TOTALSIZE + msgStoreItemMemory.putInt(msgLen); + // 2 MAGICCODE + msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + msgStoreItemMemory.putInt(bodyCrc); + // 4 QUEUEID + msgStoreItemMemory.putInt(messageExtBatch.getQueueId()); + // 5 FLAG + msgStoreItemMemory.putInt(flag); + // 6 QUEUEOFFSET + msgStoreItemMemory.putLong(queueOffset++); + // 7 PHYSICALOFFSET + msgStoreItemMemory.putLong(0); + // 8 SYSFLAG + msgStoreItemMemory.putInt(messageExtBatch.getSysFlag()); + // 9 BORNTIMESTAMP + msgStoreItemMemory.putLong(messageExtBatch.getBornTimestamp()); + // 10 BORNHOST + resetByteBuffer(bornHostHolder, bornHostLength); + msgStoreItemMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder)); + // 11 STORETIMESTAMP + msgStoreItemMemory.putLong(messageExtBatch.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + resetByteBuffer(storeHostHolder, storeHostLength); + msgStoreItemMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder)); + // 13 RECONSUMETIMES + msgStoreItemMemory.putInt(messageExtBatch.getReconsumeTimes()); + // 14 Prepared Transaction Offset + msgStoreItemMemory.putLong(0); + // 15 BODY + msgStoreItemMemory.putInt(bodyLen); + if (bodyLen > 0) { + msgStoreItemMemory.put(messagesByteBuff.array(), bodyPos, bodyLen); + } + // 16 TOPIC + msgStoreItemMemory.put((byte) topicLength); + msgStoreItemMemory.put(topicData); + // 17 PROPERTIES + msgStoreItemMemory.putShort(propertiesLen); + if (propertiesLen > 0) { + msgStoreItemMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen); + } + byte[] data = new byte[msgLen]; + msgStoreItemMemory.clear(); + msgStoreItemMemory.get(data); + batchBody.add(data); + } + + return new EncodeResult(AppendMessageStatus.PUT_OK, key, batchBody, totalMsgLen); + } + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java index a736754d..5660de13 100644 --- a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java @@ -16,17 +16,19 @@ */ package org.apache.rocketmq.store; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExtBatch; +import org.junit.After; + import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.rocketmq.common.UtilAll; -import org.junit.After; public class StoreTestBase { @@ -44,6 +46,28 @@ public class StoreTestBase { return port.addAndGet(5); } + protected MessageExtBatch buildBatchMessage(int size) { + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic("StoreTest"); + messageExtBatch.setTags("TAG1"); + messageExtBatch.setKeys("Hello"); + messageExtBatch.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + messageExtBatch.setSysFlag(0); + + messageExtBatch.setBornTimestamp(System.currentTimeMillis()); + messageExtBatch.setBornHost(BornHost); + messageExtBatch.setStoreHost(StoreHost); + + List messageList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + messageList.add(buildMessage()); + } + + messageExtBatch.setBody(MessageDecoder.encodeMessages(messageList)); + + return messageExtBatch; + } + protected MessageExtBrokerInner buildMessage() { MessageExtBrokerInner msg = new MessageExtBrokerInner(); msg.setTopic("StoreTest"); @@ -59,6 +83,40 @@ public class StoreTestBase { return msg; } + protected MessageExtBatch buildIPv6HostBatchMessage(int size) { + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic("StoreTest"); + messageExtBatch.setTags("TAG1"); + messageExtBatch.setKeys("Hello"); + messageExtBatch.setBody(MessageBody); + messageExtBatch.setMsgId("24084004018081003FAA1DDE2B3F898A00002A9F0000000000000CA0"); + messageExtBatch.setKeys(String.valueOf(System.currentTimeMillis())); + messageExtBatch.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + messageExtBatch.setSysFlag(0); + messageExtBatch.setBornHostV6Flag(); + messageExtBatch.setStoreHostAddressV6Flag(); + messageExtBatch.setBornTimestamp(System.currentTimeMillis()); + try { + messageExtBatch.setBornHost(new InetSocketAddress(InetAddress.getByName("1050:0000:0000:0000:0005:0600:300c:326b"), 8123)); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + try { + messageExtBatch.setStoreHost(new InetSocketAddress(InetAddress.getByName("::1"), 8123)); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + List messageList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + messageList.add(buildIPv6HostMessage()); + } + + messageExtBatch.setBody(MessageDecoder.encodeMessages(messageList)); + return messageExtBatch; + } + protected MessageExtBrokerInner buildIPv6HostMessage() { MessageExtBrokerInner msg = new MessageExtBrokerInner(); msg.setTopic("StoreTest"); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index e31d834e..8ab8a23b 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -19,14 +19,17 @@ package org.apache.rocketmq.store.dledger; import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; import io.openmessaging.storage.dledger.store.file.MmapFileList; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; @@ -41,7 +44,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { @Test public void testTruncateCQ() throws Exception { - String base = createBaseDir(); + String base = createBaseDir(); String peers = String.format("n0-localhost:%d", nextPort()); String group = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString(); @@ -94,10 +97,9 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { } - @Test public void testRecover() throws Exception { - String base = createBaseDir(); + String base = createBaseDir(); String peers = String.format("n0-localhost:%d", nextPort()); String group = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString(); @@ -135,10 +137,9 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { } - @Test public void testPutAndGetMessage() throws Exception { - String base = createBaseDir(); + String base = createBaseDir(); String peers = String.format("n0-localhost:%d", nextPort()); String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); @@ -148,7 +149,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { List results = new ArrayList<>(); for (int i = 0; i < 10; i++) { MessageExtBrokerInner msgInner = - i < 5 ? buildMessage() : buildIPv6HostMessage(); + i < 5 ? buildMessage() : buildIPv6HostMessage(); msgInner.setTopic(topic); msgInner.setQueueId(0); PutMessageResult putMessageResult = messageStore.putMessage(msgInner); @@ -160,7 +161,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, messageStore.dispatchBehindBytes()); - GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null); + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null); Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); Assert.assertEquals(10, getMessageResult.getMessageBufferList().size()); @@ -177,9 +178,53 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { messageStore.shutdown(); } + @Test + public void testBatchPutAndGetMessage() throws Exception { + String base = createBaseDir(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + Thread.sleep(1000); + String topic = UUID.randomUUID().toString(); + // should be less than 4 + int batchMessageSize = 2; + int repeat = 10; + List results = new ArrayList<>(); + for (int i = 0; i < repeat; i++) { + MessageExtBatch messageExtBatch = + i < repeat / 10 ? buildBatchMessage(batchMessageSize) : buildIPv6HostBatchMessage(batchMessageSize); + messageExtBatch.setTopic(topic); + messageExtBatch.setQueueId(0); + PutMessageResult putMessageResult = messageStore.putMessages(messageExtBatch); + results.add(putMessageResult); + Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + Assert.assertEquals(i * batchMessageSize, putMessageResult.getAppendMessageResult().getLogicsOffset()); + } + Thread.sleep(100); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(repeat * batchMessageSize, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 100, null); + Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); + + Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageBufferList().size()); + Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageMapedList().size()); + Assert.assertEquals(repeat * batchMessageSize, getMessageResult.getMaxOffset()); + + for (int i = 0; i < results.size(); i++) { + ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i * batchMessageSize); + MessageExt messageExt = MessageDecoder.decode(buffer); + Assert.assertEquals(i * batchMessageSize, messageExt.getQueueOffset()); + Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId().split(",").length, batchMessageSize); + Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset()); + } + messageStore.destroy(); + messageStore.shutdown(); + } + @Test public void testAsyncPutAndGetMessage() throws Exception { - String base = createBaseDir(); + String base = createBaseDir(); String peers = String.format("n0-localhost:%d", nextPort()); String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); @@ -189,7 +234,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { List results = new ArrayList<>(); for (int i = 0; i < 10; i++) { MessageExtBrokerInner msgInner = - i < 5 ? buildMessage() : buildIPv6HostMessage(); + i < 5 ? buildMessage() : buildIPv6HostMessage(); msgInner.setTopic(topic); msgInner.setQueueId(0); CompletableFuture futureResult = messageStore.asyncPutMessage(msgInner); @@ -202,7 +247,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, messageStore.dispatchBehindBytes()); - GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null); + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null); Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); Assert.assertEquals(10, getMessageResult.getMessageBufferList().size()); @@ -219,15 +264,60 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { messageStore.shutdown(); } + @Test + public void testAsyncBatchPutAndGetMessage() throws Exception { + String base = createBaseDir(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + Thread.sleep(1000); + String topic = UUID.randomUUID().toString(); + // should be less than 4 + int batchMessageSize = 2; + int repeat = 10; + + List results = new ArrayList<>(); + for (int i = 0; i < repeat; i++) { + MessageExtBatch messageExtBatch = + i < 5 ? buildBatchMessage(batchMessageSize) : buildIPv6HostBatchMessage(batchMessageSize); + messageExtBatch.setTopic(topic); + messageExtBatch.setQueueId(0); + CompletableFuture futureResult = messageStore.asyncPutMessages(messageExtBatch); + PutMessageResult putMessageResult = futureResult.get(3000, TimeUnit.MILLISECONDS); + results.add(putMessageResult); + Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + Assert.assertEquals(i * batchMessageSize, putMessageResult.getAppendMessageResult().getLogicsOffset()); + } + Thread.sleep(100); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(repeat * batchMessageSize, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null); + Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); + + Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageBufferList().size()); + Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageMapedList().size()); + Assert.assertEquals(repeat * batchMessageSize, getMessageResult.getMaxOffset()); + + for (int i = 0; i < results.size(); i++) { + ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i * batchMessageSize); + MessageExt messageExt = MessageDecoder.decode(buffer); + Assert.assertEquals(i * batchMessageSize, messageExt.getQueueOffset()); + Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId().split(",").length, batchMessageSize); + Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset()); + } + messageStore.destroy(); + messageStore.shutdown(); + } @Test public void testCommittedPos() throws Exception { String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); String group = UUID.randomUUID().toString(); - DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0); + DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0); String topic = UUID.randomUUID().toString(); - MessageExtBrokerInner msgInner = buildMessage(); + MessageExtBrokerInner msgInner = buildMessage(); msgInner.setTopic(topic); msgInner.setQueueId(0); PutMessageResult putMessageResult = leaderStore.putMessage(msgInner); @@ -239,7 +329,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); - DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0); + DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group, "n1", peers, "n0", false, 0); Thread.sleep(2000); Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); @@ -258,10 +348,10 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { public void testIPv6HostMsgCommittedPos() throws Exception { String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); String group = UUID.randomUUID().toString(); - DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0); + DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0); String topic = UUID.randomUUID().toString(); - MessageExtBrokerInner msgInner = buildIPv6HostMessage(); + MessageExtBrokerInner msgInner = buildIPv6HostMessage(); msgInner.setTopic(topic); msgInner.setQueueId(0); PutMessageResult putMessageResult = leaderStore.putMessage(msgInner); @@ -273,7 +363,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); - DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0); + DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group, "n1", peers, "n0", false, 0); Thread.sleep(2000); Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); -- GitLab