未验证 提交 845c5fd6 编写于 作者: R rongtong 提交者: GitHub

[ISSUE #1846] Dledger model change into pipeline manner to improve performance (#1847)

* enhancement(dledger):implement asyncPutMessage in dledger commitlog

* enhancement(dledger):move serialization out of lock

* fix(dledger):fix the issue that queueOffset is overwritten

* fix(dledger):fix the issue that get wrong queueOffset

* test(dledger):add dledgerCommitlog put messages async unit test

* chore(dledger): fix the issue that cannot find symbol of variable SCHEDULE_TOPIC
上级 3be7033a
......@@ -42,6 +42,7 @@ public class MessageDecoder {
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int QUEUE_OFFSET_POSITION = 4 + 4 + 4 + 4 + 4;
public static final int SYSFLAG_POSITION = 4 + 4 + 4 + 4 + 4 + 8 + 8;
// public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
// + 4 // 2 MAGICCODE
......
......@@ -28,6 +28,8 @@ import io.openmessaging.storage.dledger.store.file.MmapFile;
import io.openmessaging.storage.dledger.store.file.MmapFileList;
import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
......@@ -364,21 +366,14 @@ public class DLedgerCommitLog extends CommitLog {
return beginTimeInDledgerLock;
}
@Override
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
private void setMessageInfo(MessageExtBrokerInner msg, int tranType) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
//should be consistent with the old version
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
......@@ -387,8 +382,9 @@ public class DLedgerCommitLog extends CommitLog {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
......@@ -400,6 +396,25 @@ public class DLedgerCommitLog extends CommitLog {
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
}
@Override
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
String topic = msg.getTopic();
setMessageInfo(msg,tranType);
// Back to Results
AppendMessageResult appendResult;
AppendFuture<AppendEntryResponse> dledgerFuture;
......@@ -411,14 +426,15 @@ public class DLedgerCommitLog extends CommitLog {
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
encodeResult = this.messageSerializer.serialize(msg);
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
}
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
request.setBody(encodeResult.getData());
dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
......@@ -430,7 +446,7 @@ public class DLedgerCommitLog extends CommitLog {
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
......@@ -496,12 +512,104 @@ public class DLedgerCommitLog extends CommitLog {
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
return CompletableFuture.completedFuture(this.putMessage(msg));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
setMessageInfo(msg, tranType);
final String finalTopic = msg.getTopic();
// Back to Results
AppendMessageResult appendResult;
AppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock;
long queueOffset;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset);
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.getData());
dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
}
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
break;
default:
break;
}
} catch (Exception e) {
log.error("Put message error", e);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
}
return dledgerFuture.thenApply(appendEntryResponse -> {
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
case SUCCESS:
putMessageStatus = PutMessageStatus.PUT_OK;
break;
case INCONSISTENT_LEADER:
case NOT_LEADER:
case LEADER_NOT_READY:
case DISK_FULL:
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break;
case WAIT_QUORUM_ACK_TIMEOUT:
//Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
}
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).addAndGet(appendResult.getWroteBytes());
}
return putMessageResult;
});
}
@Override
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
return CompletableFuture.completedFuture(putMessages(messageExtBatch));
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
@Override
......@@ -566,51 +674,69 @@ public class DLedgerCommitLog extends CommitLog {
return diff;
}
private long getQueueOffsetByKey(String key, int tranType) {
Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
return queueOffset;
}
class EncodeResult {
private String queueOffsetKey;
private byte[] data;
private ByteBuffer data;
private AppendMessageStatus status;
public EncodeResult(AppendMessageStatus status, byte[] data, String queueOffsetKey) {
public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOffsetKey) {
this.data = data;
this.status = status;
this.queueOffsetKey = queueOffsetKey;
}
public void setQueueOffsetKey(long offset) {
data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
}
public byte[] getData() {
return data.array();
}
}
class MessageSerializer {
// File at the end of the minimum fixed length empty
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
private final ByteBuffer msgIdMemory;
private final ByteBuffer msgIdV6Memory;
// Store the message content
private final ByteBuffer msgStoreItemMemory;
// The maximum length of the message
private final int maxMessageSize;
// Build Message Key
private final StringBuilder keyBuilder = new StringBuilder();
private final StringBuilder msgIdBuilder = new StringBuilder();
// private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
MessageSerializer(final int size) {
this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
this.maxMessageSize = size;
}
public ByteBuffer getMsgStoreItemMemory() {
return msgStoreItemMemory;
}
public EncodeResult serialize(final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = 0;
long queueOffset = 0;
int sysflag = msgInner.getSysFlag();
int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
......@@ -618,33 +744,7 @@ public class DLedgerCommitLog extends CommitLog {
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
String key = msgInner.getTopic() + "-" + msgInner.getQueueId();
/**
* Serialize message
......@@ -666,6 +766,8 @@ public class DLedgerCommitLog extends CommitLog {
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
......@@ -675,60 +777,56 @@ public class DLedgerCommitLog extends CommitLog {
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(wroteOffset);
msgStoreItemMemory.putLong(wroteOffset);
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
resetByteBuffer(bornHostHolder, bornHostLength);
msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
resetByteBuffer(storeHostHolder, storeHostLength);
msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0) {
this.msgStoreItemMemory.put(msgInner.getBody());
msgStoreItemMemory.put(msgInner.getBody());
}
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
msgStoreItemMemory.put((byte) topicLength);
msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0) {
this.msgStoreItemMemory.put(propertiesData);
msgStoreItemMemory.put(propertiesData);
}
byte[] data = new byte[msgLen];
this.msgStoreItemMemory.clear();
this.msgStoreItemMemory.get(data);
return new EncodeResult(AppendMessageStatus.PUT_OK, data, key);
return new EncodeResult(AppendMessageStatus.PUT_OK, msgStoreItemMemory, key);
}
private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
byteBuffer.flip();
byteBuffer.limit(limit);
}
}
public static class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult {
......
......@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.DefaultMessageStore;
......@@ -175,6 +177,48 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
messageStore.shutdown();
}
@Test
public void testAsyncPutAndGetMessage() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
List<PutMessageResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
MessageExtBrokerInner msgInner =
i < 5 ? buildMessage() : buildIPv6HostMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
CompletableFuture<PutMessageResult> futureResult = messageStore.asyncPutMessage(msgInner);
PutMessageResult putMessageResult = futureResult.get(3000, TimeUnit.MILLISECONDS);
results.add(putMessageResult);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
Assert.assertEquals(10, getMessageResult.getMessageMapedList().size());
for (int i = 0; i < results.size(); i++) {
ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i);
MessageExt messageExt = MessageDecoder.decode(buffer);
Assert.assertEquals(i, messageExt.getQueueOffset());
Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId());
Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
}
messageStore.destroy();
messageStore.shutdown();
}
@Test
public void testCommittedPos() throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册