未验证 提交 4730987c 编写于 作者: T TerrellChen 提交者: GitHub

[ISSUE #690] Support batch msgs in dledger mode (#2406)

* implement issue-690

* add unit test

* fix version

* fix wroteOffset;update version;polish

* polish

* fix wrong wroteOffset of AppendMessageResult

* move serialization out of lock in async method
上级 48f076f2
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
<dependency> <dependency>
<groupId>io.openmessaging.storage</groupId> <groupId>io.openmessaging.storage</groupId>
<artifactId>dledger</artifactId> <artifactId>dledger</artifactId>
<version>0.2.0</version> <version>0.2.2</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.apache.rocketmq</groupId> <groupId>org.apache.rocketmq</groupId>
......
...@@ -17,11 +17,13 @@ ...@@ -17,11 +17,13 @@
package org.apache.rocketmq.store.dledger; package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.AppendFuture; import io.openmessaging.storage.dledger.AppendFuture;
import io.openmessaging.storage.dledger.BatchAppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.entry.DLedgerEntry; import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFile; import io.openmessaging.storage.dledger.store.file.MmapFile;
...@@ -32,6 +34,8 @@ import java.net.Inet6Address; ...@@ -32,6 +34,8 @@ import java.net.Inet6Address;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
...@@ -74,6 +78,8 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -74,6 +78,8 @@ public class DLedgerCommitLog extends CommitLog {
private boolean isInrecoveringOldCommitlog = false; private boolean isInrecoveringOldCommitlog = false;
private final StringBuilder msgIdBuilder = new StringBuilder();
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore); super(defaultMessageStore);
dLedgerConfig = new DLedgerConfig(); dLedgerConfig = new DLedgerConfig();
...@@ -507,9 +513,131 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -507,9 +513,131 @@ public class DLedgerCommitLog extends CommitLog {
@Override @Override
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (messageExtBatch.getDelayTimeLevel() > 0) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
} }
// Set the storage time
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setStoreHostAddressV6Flag();
}
// Back to Results
AppendMessageResult appendResult;
BatchAppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
msgIdBuilder.setLength(0);
long elapsedTimeInLock;
long queueOffset;
long msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
encodeResult = this.messageSerializer.serialize(messageExtBatch);
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status));
}
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData);
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
}
long wroteOffset = 0;
int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
boolean isFirstOffset = true;
long firstWroteOffset = 0;
for (long pos : dledgerFuture.getPositions()) {
wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
if (isFirstOffset) {
firstWroteOffset = wroteOffset;
isFirstOffset = false;
}
String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
msgNum++;
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);
} catch (Exception e) {
log.error("Put message error", e);
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus
.UNKNOWN_ERROR));
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
}
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
try {
AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
case SUCCESS:
putMessageStatus = PutMessageStatus.PUT_OK;
break;
case INCONSISTENT_LEADER:
case NOT_LEADER:
case LEADER_NOT_READY:
case DISK_FULL:
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break;
case WAIT_QUORUM_ACK_TIMEOUT:
//Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
}
} catch (Throwable t) {
log.error("Failed to get dledger append result", t);
}
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum);
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen);
}
return putMessageResult;
}
@Override @Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) { public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
...@@ -609,8 +737,126 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -609,8 +737,126 @@ public class DLedgerCommitLog extends CommitLog {
@Override @Override
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) { public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
} }
if (messageExtBatch.getDelayTimeLevel() > 0) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
// Set the storage time
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setStoreHostAddressV6Flag();
}
// Back to Results
AppendMessageResult appendResult;
BatchAppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(messageExtBatch);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status)));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
msgIdBuilder.setLength(0);
long elapsedTimeInLock;
long queueOffset;
long msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData);
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
}
long wroteOffset = 0;
int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
boolean isFirstOffset = true;
long firstWroteOffset = 0;
for (long pos : dledgerFuture.getPositions()) {
wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
if (isFirstOffset) {
firstWroteOffset = wroteOffset;
isFirstOffset = false;
}
String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
msgNum++;
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);
} catch (Exception e) {
log.error("Put message error", e);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
}
return dledgerFuture.thenApply(appendEntryResponse -> {
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
case SUCCESS:
putMessageStatus = PutMessageStatus.PUT_OK;
break;
case INCONSISTENT_LEADER:
case NOT_LEADER:
case LEADER_NOT_READY:
case DISK_FULL:
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break;
case WAIT_QUORUM_ACK_TIMEOUT:
//Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
}
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(appendResult.getWroteBytes());
}
return putMessageResult;
});
}
@Override @Override
public SelectMappedBufferResult getMessage(final long offset, final int size) { public SelectMappedBufferResult getMessage(final long offset, final int size) {
...@@ -701,7 +947,9 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -701,7 +947,9 @@ public class DLedgerCommitLog extends CommitLog {
class EncodeResult { class EncodeResult {
private String queueOffsetKey; private String queueOffsetKey;
private ByteBuffer data; private ByteBuffer data;
private List<byte[]> batchData;
private AppendMessageStatus status; private AppendMessageStatus status;
private int totalMsgLen;
public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOffsetKey) { public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOffsetKey) {
this.data = data; this.data = data;
...@@ -716,6 +964,13 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -716,6 +964,13 @@ public class DLedgerCommitLog extends CommitLog {
public byte[] getData() { public byte[] getData() {
return data.array(); return data.array();
} }
public EncodeResult(AppendMessageStatus status, String queueOffsetKey, List<byte[]> batchData, int totalMsgLen) {
this.batchData = batchData;
this.status = status;
this.queueOffsetKey = queueOffsetKey;
this.totalMsgLen = totalMsgLen;
}
} }
class MessageSerializer { class MessageSerializer {
...@@ -823,6 +1078,123 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -823,6 +1078,123 @@ public class DLedgerCommitLog extends CommitLog {
return new EncodeResult(AppendMessageStatus.PUT_OK, msgStoreItemMemory, key); return new EncodeResult(AppendMessageStatus.PUT_OK, msgStoreItemMemory, key);
} }
public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
keyBuilder.setLength(0);
keyBuilder.append(messageExtBatch.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExtBatch.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
}
int totalMsgLen = 0;
ByteBuffer messagesByteBuff = messageExtBatch.wrap();
List<byte[]> batchBody = new LinkedList<>();
int sysFlag = messageExtBatch.getSysFlag();
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
messagesByteBuff.getInt();
// 2 MAGICCODE
messagesByteBuff.getInt();
// 3 BODYCRC
messagesByteBuff.getInt();
// 4 FLAG
int flag = messagesByteBuff.getInt();
// 5 BODY
int bodyLen = messagesByteBuff.getInt();
int bodyPos = messagesByteBuff.position();
int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
messagesByteBuff.position(bodyPos + bodyLen);
// 6 properties
short propertiesLen = messagesByteBuff.getShort();
int propertiesPos = messagesByteBuff.position();
messagesByteBuff.position(propertiesPos + propertiesLen);
final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen);
ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " +
bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
throw new RuntimeException("message size exceeded");
}
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if (totalMsgLen > maxMessageSize) {
throw new RuntimeException("message size exceeded");
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
msgStoreItemMemory.putInt(bodyCrc);
// 4 QUEUEID
msgStoreItemMemory.putInt(messageExtBatch.getQueueId());
// 5 FLAG
msgStoreItemMemory.putInt(flag);
// 6 QUEUEOFFSET
msgStoreItemMemory.putLong(queueOffset++);
// 7 PHYSICALOFFSET
msgStoreItemMemory.putLong(0);
// 8 SYSFLAG
msgStoreItemMemory.putInt(messageExtBatch.getSysFlag());
// 9 BORNTIMESTAMP
msgStoreItemMemory.putLong(messageExtBatch.getBornTimestamp());
// 10 BORNHOST
resetByteBuffer(bornHostHolder, bornHostLength);
msgStoreItemMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
msgStoreItemMemory.putLong(messageExtBatch.getStoreTimestamp());
// 12 STOREHOSTADDRESS
resetByteBuffer(storeHostHolder, storeHostLength);
msgStoreItemMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
msgStoreItemMemory.putInt(messageExtBatch.getReconsumeTimes());
// 14 Prepared Transaction Offset
msgStoreItemMemory.putLong(0);
// 15 BODY
msgStoreItemMemory.putInt(bodyLen);
if (bodyLen > 0) {
msgStoreItemMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);
}
// 16 TOPIC
msgStoreItemMemory.put((byte) topicLength);
msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
msgStoreItemMemory.putShort(propertiesLen);
if (propertiesLen > 0) {
msgStoreItemMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
}
byte[] data = new byte[msgLen];
msgStoreItemMemory.clear();
msgStoreItemMemory.get(data);
batchBody.add(data);
}
return new EncodeResult(AppendMessageStatus.PUT_OK, key, batchBody, totalMsgLen);
}
private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
byteBuffer.flip(); byteBuffer.flip();
byteBuffer.limit(limit); byteBuffer.limit(limit);
......
...@@ -16,17 +16,19 @@ ...@@ -16,17 +16,19 @@
*/ */
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.junit.After;
import java.io.File; import java.io.File;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.HashSet; import java.util.*;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.UtilAll;
import org.junit.After;
public class StoreTestBase { public class StoreTestBase {
...@@ -44,6 +46,28 @@ public class StoreTestBase { ...@@ -44,6 +46,28 @@ public class StoreTestBase {
return port.addAndGet(5); return port.addAndGet(5);
} }
protected MessageExtBatch buildBatchMessage(int size) {
MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic("StoreTest");
messageExtBatch.setTags("TAG1");
messageExtBatch.setKeys("Hello");
messageExtBatch.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
messageExtBatch.setSysFlag(0);
messageExtBatch.setBornTimestamp(System.currentTimeMillis());
messageExtBatch.setBornHost(BornHost);
messageExtBatch.setStoreHost(StoreHost);
List<Message> messageList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
messageList.add(buildMessage());
}
messageExtBatch.setBody(MessageDecoder.encodeMessages(messageList));
return messageExtBatch;
}
protected MessageExtBrokerInner buildMessage() { protected MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner(); MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("StoreTest"); msg.setTopic("StoreTest");
...@@ -59,6 +83,40 @@ public class StoreTestBase { ...@@ -59,6 +83,40 @@ public class StoreTestBase {
return msg; return msg;
} }
protected MessageExtBatch buildIPv6HostBatchMessage(int size) {
MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic("StoreTest");
messageExtBatch.setTags("TAG1");
messageExtBatch.setKeys("Hello");
messageExtBatch.setBody(MessageBody);
messageExtBatch.setMsgId("24084004018081003FAA1DDE2B3F898A00002A9F0000000000000CA0");
messageExtBatch.setKeys(String.valueOf(System.currentTimeMillis()));
messageExtBatch.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
messageExtBatch.setSysFlag(0);
messageExtBatch.setBornHostV6Flag();
messageExtBatch.setStoreHostAddressV6Flag();
messageExtBatch.setBornTimestamp(System.currentTimeMillis());
try {
messageExtBatch.setBornHost(new InetSocketAddress(InetAddress.getByName("1050:0000:0000:0000:0005:0600:300c:326b"), 8123));
} catch (UnknownHostException e) {
e.printStackTrace();
}
try {
messageExtBatch.setStoreHost(new InetSocketAddress(InetAddress.getByName("::1"), 8123));
} catch (UnknownHostException e) {
e.printStackTrace();
}
List<Message> messageList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
messageList.add(buildIPv6HostMessage());
}
messageExtBatch.setBody(MessageDecoder.encodeMessages(messageList));
return messageExtBatch;
}
protected MessageExtBrokerInner buildIPv6HostMessage() { protected MessageExtBrokerInner buildIPv6HostMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner(); MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("StoreTest"); msg.setTopic("StoreTest");
......
...@@ -19,14 +19,17 @@ package org.apache.rocketmq.store.dledger; ...@@ -19,14 +19,17 @@ package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFileList; import io.openmessaging.storage.dledger.store.file.MmapFileList;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.GetMessageStatus;
...@@ -94,7 +97,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -94,7 +97,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
} }
@Test @Test
public void testRecover() throws Exception { public void testRecover() throws Exception {
String base = createBaseDir(); String base = createBaseDir();
...@@ -135,7 +137,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -135,7 +137,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
} }
@Test @Test
public void testPutAndGetMessage() throws Exception { public void testPutAndGetMessage() throws Exception {
String base = createBaseDir(); String base = createBaseDir();
...@@ -177,6 +178,50 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -177,6 +178,50 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
messageStore.shutdown(); messageStore.shutdown();
} }
@Test
public void testBatchPutAndGetMessage() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
// should be less than 4
int batchMessageSize = 2;
int repeat = 10;
List<PutMessageResult> results = new ArrayList<>();
for (int i = 0; i < repeat; i++) {
MessageExtBatch messageExtBatch =
i < repeat / 10 ? buildBatchMessage(batchMessageSize) : buildIPv6HostBatchMessage(batchMessageSize);
messageExtBatch.setTopic(topic);
messageExtBatch.setQueueId(0);
PutMessageResult putMessageResult = messageStore.putMessages(messageExtBatch);
results.add(putMessageResult);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(i * batchMessageSize, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(repeat * batchMessageSize, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 100, null);
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageBufferList().size());
Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageMapedList().size());
Assert.assertEquals(repeat * batchMessageSize, getMessageResult.getMaxOffset());
for (int i = 0; i < results.size(); i++) {
ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i * batchMessageSize);
MessageExt messageExt = MessageDecoder.decode(buffer);
Assert.assertEquals(i * batchMessageSize, messageExt.getQueueOffset());
Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId().split(",").length, batchMessageSize);
Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
}
messageStore.destroy();
messageStore.shutdown();
}
@Test @Test
public void testAsyncPutAndGetMessage() throws Exception { public void testAsyncPutAndGetMessage() throws Exception {
String base = createBaseDir(); String base = createBaseDir();
...@@ -219,12 +264,57 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -219,12 +264,57 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
messageStore.shutdown(); messageStore.shutdown();
} }
@Test
public void testAsyncBatchPutAndGetMessage() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
// should be less than 4
int batchMessageSize = 2;
int repeat = 10;
List<PutMessageResult> results = new ArrayList<>();
for (int i = 0; i < repeat; i++) {
MessageExtBatch messageExtBatch =
i < 5 ? buildBatchMessage(batchMessageSize) : buildIPv6HostBatchMessage(batchMessageSize);
messageExtBatch.setTopic(topic);
messageExtBatch.setQueueId(0);
CompletableFuture<PutMessageResult> futureResult = messageStore.asyncPutMessages(messageExtBatch);
PutMessageResult putMessageResult = futureResult.get(3000, TimeUnit.MILLISECONDS);
results.add(putMessageResult);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(i * batchMessageSize, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(repeat * batchMessageSize, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageBufferList().size());
Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageMapedList().size());
Assert.assertEquals(repeat * batchMessageSize, getMessageResult.getMaxOffset());
for (int i = 0; i < results.size(); i++) {
ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i * batchMessageSize);
MessageExt messageExt = MessageDecoder.decode(buffer);
Assert.assertEquals(i * batchMessageSize, messageExt.getQueueOffset());
Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId().split(",").length, batchMessageSize);
Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
}
messageStore.destroy();
messageStore.shutdown();
}
@Test @Test
public void testCommittedPos() throws Exception { public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String group = UUID.randomUUID().toString(); String group = UUID.randomUUID().toString();
DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0); DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0);
String topic = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString();
MessageExtBrokerInner msgInner = buildMessage(); MessageExtBrokerInner msgInner = buildMessage();
...@@ -239,7 +329,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -239,7 +329,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0); DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group, "n1", peers, "n0", false, 0);
Thread.sleep(2000); Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
...@@ -258,7 +348,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -258,7 +348,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
public void testIPv6HostMsgCommittedPos() throws Exception { public void testIPv6HostMsgCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String group = UUID.randomUUID().toString(); String group = UUID.randomUUID().toString();
DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0); DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0);
String topic = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString();
MessageExtBrokerInner msgInner = buildIPv6HostMessage(); MessageExtBrokerInner msgInner = buildIPv6HostMessage();
...@@ -273,7 +363,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase { ...@@ -273,7 +363,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0); DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group, "n1", peers, "n0", false, 0);
Thread.sleep(2000); Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册