diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 1eb9f4d9948dde797218908789a249bc478bb5f8..af0b638d3d0ff62bed2718708255e52b26d8c85f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -41,6 +41,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
+ public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
diff --git a/pom.xml b/pom.xml
index 0a8fef87584e1fe741b574361e34253008b75106..148cc99b2999c786284a5c8d36e1bf7b9aa7017e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,8 +100,8 @@
false
true
- 1.7
- 1.7
+ 1.8
+ 1.8
jacoco
${project.basedir}/../test/target/jacoco-it.exec
diff --git a/store/pom.xml b/store/pom.xml
index 59be3739dedfb37f84eb8566c0846ea8881d0b30..7de04fcae5f377bffa61bff52593c6c565282f2f 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -28,6 +28,21 @@
rocketmq-store ${project.version}
+
+ org.apache.rocketmq
+ rocketmq-dleger
+ 0.1-SNAPSHOT
+
+
+ org.apache.rocketmq
+ rocketmq-remoting
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
${project.groupId}
rocketmq-common
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 03b1151643ff7e750902e3ed1f7a79f6a9da1895..a1c329905045a9b1c87e0769e5b39c2a8d776261 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -45,11 +45,11 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
public class CommitLog {
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
- private final static int BLANK_MAGIC_CODE = -875286124;
+ protected final static int BLANK_MAGIC_CODE = -875286124;
private final MappedFileQueue mappedFileQueue;
- private final DefaultMessageStore defaultMessageStore;
+ protected final DefaultMessageStore defaultMessageStore;
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
@@ -57,11 +57,11 @@ public class CommitLog {
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal batchEncoderThreadLocal;
- private HashMap topicQueueTable = new HashMap(1024);
- private volatile long confirmOffset = -1L;
+ protected HashMap topicQueueTable = new HashMap(1024);
+ protected volatile long confirmOffset = -1L;
- private volatile long beginTimeInLock = 0;
- private final PutMessageLock putMessageLock;
+ protected volatile long beginTimeInLock = 0;
+ protected final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
@@ -366,7 +366,7 @@ public class CommitLog {
return new DispatchRequest(-1, false /* success */);
}
- private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
+ protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 08c7f99906962f20c748ba29bf12f58c81e8035c..f5a1250d477f7bfe4808b78d115871ad6615ef89 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -348,7 +348,7 @@ public class ConsumeQueue {
long tagsCode = result.getByteBuffer().getLong();
if (offsetPy >= phyMinOffset) {
- this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
+ this.minLogicOffset = mappedFile.getFileFromOffset() + i;
log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
this.getMinOffsetInQueue(), this.topic, this.queueId);
// This maybe not take effect, when not every consume queue has extend file.
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index e0aef4f37450dd546d49687d2af96ec212bbe3e3..76be492ba468a5efb595536cec5fdbfc4586412f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.dleger.DLegerCommitLog;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.index.QueryOffsetResult;
@@ -119,7 +120,11 @@ public class DefaultMessageStore implements MessageStore {
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this);
- this.commitLog = new CommitLog(this);
+ if (messageStoreConfig.isEnableDLegerCommitLog()) {
+ this.commitLog = new DLegerCommitLog(this);
+ } else {
+ this.commitLog = new CommitLog(this);
+ }
this.consumeQueueTable = new ConcurrentHashMap<>(32);
this.flushConsumeQueueService = new FlushConsumeQueueService();
@@ -1763,7 +1768,7 @@ public class DefaultMessageStore implements MessageStore {
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
- int size = dispatchRequest.getMsgSize();
+ int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
index 819bb948c758d7ecfab5a982c8b5454b5907daad..89d47ced5ba55b344c5376b0bcd6aa7871b11940 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
@@ -22,7 +22,7 @@ public class DispatchRequest {
private final String topic;
private final int queueId;
private final long commitLogOffset;
- private final int msgSize;
+ private int msgSize;
private final long tagsCode;
private final long storeTimestamp;
private final long consumeQueueOffset;
@@ -35,6 +35,8 @@ public class DispatchRequest {
private final Map propertiesMap;
private byte[] bitMap;
+ private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something
+
public DispatchRequest(
final String topic,
final int queueId,
@@ -156,4 +158,16 @@ public class DispatchRequest {
public void setBitMap(byte[] bitMap) {
this.bitMap = bitMap;
}
+
+ public void setMsgSize(int msgSize) {
+ this.msgSize = msgSize;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 7a17114c8f3627bec4d3bc6086118547ac78052c..1f826fedd891b6ff9c55ec56f2b48b2bbdbbe558 100644
--- a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -48,9 +48,9 @@ public class SelectMappedBufferResult {
this.byteBuffer.limit(this.size);
}
- public MappedFile getMappedFile() {
+ /* public MappedFile getMappedFile() {
return mappedFile;
- }
+ }*/
// @Override
// protected void finalize() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 02aa84a3e6b590b43901e529b810b4812144ff72..63c7c313fc2370b073c737d94eddb093d2e317a3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -143,6 +143,11 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;
+ private boolean enableDLegerCommitLog;
+ private String dLegerGroup;
+ private String dLegerPeers;
+ private String dLegerSelfId;
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -666,4 +671,35 @@ public class MessageStoreConfig {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}
+ public String getdLegerGroup() {
+ return dLegerGroup;
+ }
+
+ public void setdLegerGroup(String dLegerGroup) {
+ this.dLegerGroup = dLegerGroup;
+ }
+
+ public String getdLegerPeers() {
+ return dLegerPeers;
+ }
+
+ public void setdLegerPeers(String dLegerPeers) {
+ this.dLegerPeers = dLegerPeers;
+ }
+
+ public String getdLegerSelfId() {
+ return dLegerSelfId;
+ }
+
+ public void setdLegerSelfId(String dLegerSelfId) {
+ this.dLegerSelfId = dLegerSelfId;
+ }
+
+ public boolean isEnableDLegerCommitLog() {
+ return enableDLegerCommitLog;
+ }
+
+ public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) {
+ this.enableDLegerCommitLog = enableDLegerCommitLog;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java
new file mode 100644
index 0000000000000000000000000000000000000000..272a1868ba02c461c67c608b44c2e078bde3b0f3
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.dleger;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.dleger.DLegerConfig;
+import org.apache.rocketmq.dleger.DLegerServer;
+import org.apache.rocketmq.dleger.entry.DLegerEntry;
+import org.apache.rocketmq.dleger.protocol.AppendEntryRequest;
+import org.apache.rocketmq.dleger.protocol.AppendEntryResponse;
+import org.apache.rocketmq.dleger.protocol.DLegerResponseCode;
+import org.apache.rocketmq.dleger.store.file.DLegerMmapFileStore;
+import org.apache.rocketmq.dleger.store.file.MmapFile;
+import org.apache.rocketmq.dleger.store.file.MmapFileList;
+import org.apache.rocketmq.dleger.store.file.SelectMmapBufferResult;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.CommitLog;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.StoreStatsService;
+import org.apache.rocketmq.store.schedule.ScheduleMessageService;
+
+/**
+ * Store all metadata downtime for recovery, data protection reliability
+ */
+public class DLegerCommitLog extends CommitLog {
+ private final DLegerServer dLegerServer;
+ private final DLegerMmapFileStore dLegerFileStore;
+ private final MmapFileList dLegerFileList;
+
+
+
+ private final MessageSerializer messageSerializer;
+
+ private volatile long beginTimeInLock = 0;
+
+ public DLegerCommitLog(final DefaultMessageStore defaultMessageStore) {
+ super(defaultMessageStore);
+ DLegerConfig dLegerConfig = new DLegerConfig();
+ dLegerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
+ dLegerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
+ dLegerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
+ dLegerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
+ dLegerServer = new DLegerServer(dLegerConfig);
+ dLegerFileStore = (DLegerMmapFileStore) dLegerServer.getdLegerStore();
+ DLegerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
+ assert bodyOffset == DLegerEntry.BODY_OFFSET;
+ buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
+ buffer.putLong(entry.getPos() + bodyOffset);
+ };
+ dLegerFileStore.addAppendHook(appendHook);
+ dLegerFileList = dLegerFileStore.getDataFileList();
+ this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+
+ }
+
+ public boolean load() {
+ /*boolean result = this.mappedFileQueue.load();
+ log.info("load commit log " + (result ? "OK" : "Failed"));
+ return result;*/
+ return true;
+ }
+
+ public void start() {
+ dLegerServer.startup();
+ /* this.flushCommitLogService.start();
+
+ if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ this.commitLogService.start();
+ }*/
+ }
+
+ public void shutdown() {
+ /* if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ this.commitLogService.shutdown();
+ }
+ this.flushCommitLogService.shutdown();*/
+ dLegerServer.shutdown();
+ }
+
+ public long flush() {
+ dLegerFileStore.flush();
+ return dLegerFileList.getFlushedWhere();
+ }
+
+ public long getMaxOffset() {
+ return this.dLegerFileList.getMaxWrotePosition();
+ }
+
+ public long remainHowManyDataToCommit() {
+ return dLegerFileList.remainHowManyDataToCommit();
+ }
+
+ public long remainHowManyDataToFlush() {
+ return dLegerFileList.remainHowManyDataToFlush();
+ }
+
+ public int deleteExpiredFile(
+ final long expiredTime,
+ final int deleteFilesInterval,
+ final long intervalForcibly,
+ final boolean cleanImmediately
+ ) {
+ return 0;
+ }
+
+ /**
+ * Read CommitLog data, use data replication
+ */
+ public SelectMappedBufferResult getData(final long offset) {
+ return this.getData(offset, offset == 0);
+ }
+
+ private static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult {
+
+ private SelectMmapBufferResult sbr;
+ public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
+ super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null);
+ this.sbr = sbr;
+ }
+
+ public synchronized void release() {
+ super.release();
+ sbr.release();
+ }
+
+ }
+
+ public SelectMappedBufferResult convertSbr(SelectMmapBufferResult sbr) {
+ if (sbr == null) {
+ return null;
+ } else {
+ return new DLegerSelectMappedBufferResult(sbr);
+ }
+
+ }
+
+
+ public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
+ int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData();
+ MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound);
+ if (mappedFile != null) {
+ int pos = (int) (offset % mappedFileSize);
+ SelectMmapBufferResult sbr = mappedFile.selectMappedBuffer(pos);
+ return convertSbr(sbr);
+ }
+
+ return null;
+ }
+
+ /**
+ * When the normal exit, data recovery, all memory data have been flush
+ */
+ public void recoverNormally() {
+
+ }
+ public void recoverAbnormally() {
+
+ }
+
+ public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC) {
+ return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
+ }
+
+ private void doNothingForDeadCode(final Object obj) {
+ if (obj != null) {
+ log.debug(String.valueOf(obj.hashCode()));
+ }
+ }
+
+ /**
+ * check the message and returns the message size
+ *
+ * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
+ */
+ public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC,
+ final boolean readBody) {
+ try {
+ int bodyOffset = DLegerEntry.BODY_OFFSET;
+ byteBuffer.position(byteBuffer.position() + bodyOffset);
+ DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody);
+ if (dispatchRequest.isSuccess()) {
+ dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
+ } else if (dispatchRequest.getMsgSize() > 0) {
+ dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
+ }
+ return dispatchRequest;
+ } catch (Exception e) {
+ }
+
+ return new DispatchRequest(-1, false /* success */);
+ }
+
+ public long getConfirmOffset() {
+ return this.confirmOffset;
+ }
+
+ public void setConfirmOffset(long phyOffset) {
+ this.confirmOffset = phyOffset;
+ }
+
+
+
+
+ private void notifyMessageArriving() {
+
+ }
+
+ public boolean resetOffset(long offset) {
+ //return this.mappedFileQueue.resetOffset(offset);
+ return false;
+ }
+
+ public long getBeginTimeInLock() {
+ return beginTimeInLock;
+ }
+
+ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+ // Set the storage time
+ msg.setStoreTimestamp(System.currentTimeMillis());
+ // Set the message body BODY CRC (consider the most appropriate setting
+ // on the client)
+ msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
+
+ StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+ String topic = msg.getTopic();
+ int queueId = msg.getQueueId();
+
+ //should be consistent with the old version
+ final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
+ if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
+ || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
+ // Delay Delivery
+ if (msg.getDelayTimeLevel() > 0) {
+ if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
+ msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
+ }
+
+ topic = ScheduleMessageService.SCHEDULE_TOPIC;
+ queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
+
+ // Backup real topic, queueId
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
+ msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+ msg.setTopic(topic);
+ msg.setQueueId(queueId);
+ }
+ }
+
+ // Back to Results
+ AppendMessageResult appendResult = null;
+ PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
+ CompletableFuture dlegerFuture = null;
+ EncodeResult encodeResult = null;
+ long eclipseTimeInLock = 0L;
+ putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+ long queueOffset = -1;
+ try {
+ beginTimeInLock = this.defaultMessageStore.getSystemClock().now();
+ //TO DO use buffer
+ encodeResult = this.messageSerializer.serialize(msg);
+ queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+ if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+ appendResult = new AppendMessageResult(encodeResult.status);
+ switch (encodeResult.status) {
+ case PROPERTIES_SIZE_EXCEEDED:
+ case MESSAGE_SIZE_EXCEEDED:
+ putMessageStatus = PutMessageStatus.MESSAGE_ILLEGAL;
+ break;
+ }
+ } else {
+ AppendEntryRequest request = new AppendEntryRequest();
+ request.setRemoteId(dLegerServer.getMemberState().getSelfId());
+ request.setBody(encodeResult.data);
+ dlegerFuture = dLegerServer.handleAppend(request);
+ if (dlegerFuture.isDone() && dlegerFuture.get().getCode() != DLegerResponseCode.SUCCESS.getCode()) {
+ //TO DO make sure the local store is ok
+ appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ } else {
+ switch (tranType) {
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ break;
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ // The next update ConsumeQueue information
+ DLegerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInLock;
+ } catch (Exception e) {
+ log.error("Put message error", e);
+ appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ } finally {
+ putMessageLock.unlock();
+ }
+
+ if (eclipseTimeInLock > 500) {
+ log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, appendResult);
+ }
+
+
+ if (dlegerFuture != null) {
+ try {
+ AppendEntryResponse appendEntryResponse = dlegerFuture.get(3, TimeUnit.SECONDS);
+ switch (DLegerResponseCode.valueOf(appendEntryResponse.getCode())) {
+ case SUCCESS:
+ putMessageStatus = PutMessageStatus.PUT_OK;
+ long wroteOffset = appendEntryResponse.getPos() + DLegerEntry.BODY_OFFSET;
+ ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
+ String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
+ appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, 0);
+ break;
+ case INCONSISTENT_LEADER:
+ case NOT_LEADER:
+ case NOT_READY:
+ case DISK_FULL:
+ putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
+ break;
+ case LEADER_PENDING_FULL:
+ putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+ break;
+ }
+ } catch (Exception ignored) {
+ putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
+ appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ }
+ }
+
+ PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
+ if (putMessageStatus == PutMessageStatus.PUT_OK) {
+ // Statistics
+ storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
+ storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes());
+ }
+ return putMessageResult;
+ }
+
+ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
+
+ }
+
+ public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
+
+ }
+
+ public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+ }
+
+ /**
+ * According to receive certain message or offset storage time if an error occurs, it returns -1
+ */
+ public long pickupStoreTimestamp(final long offset, final int size) {
+ if (offset >= this.getMinOffset()) {
+ SelectMappedBufferResult result = this.getMessage(offset, size);
+ if (null != result) {
+ try {
+ return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+ } finally {
+ result.release();
+ }
+ }
+ }
+
+ return -1;
+ }
+
+ public long getMinOffset() {
+ return dLegerFileList.getMinOffset();
+ }
+
+ public SelectMappedBufferResult getMessage(final long offset, final int size) {
+ int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData();
+ MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, offset == 0);
+ if (mappedFile != null) {
+ int pos = (int) (offset % mappedFileSize);
+ return convertSbr(mappedFile.selectMappedBuffer(pos, size));
+ }
+ return null;
+ }
+
+ public long rollNextFile(final long offset) {
+ int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+ return offset + mappedFileSize - offset % mappedFileSize;
+ }
+
+ public HashMap getTopicQueueTable() {
+ return topicQueueTable;
+ }
+
+ public void setTopicQueueTable(HashMap topicQueueTable) {
+ this.topicQueueTable = topicQueueTable;
+ }
+
+ public void destroy() {
+ //TO DO
+ }
+
+ public boolean appendData(long startOffset, byte[] data) {
+ //TO DO
+ return false;
+ }
+
+ public boolean retryDeleteFirstFile(final long intervalForcibly) {
+ //TO DO
+ return false;
+ }
+
+ public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
+ String key = topic + "-" + queueId;
+ synchronized (this) {
+ this.topicQueueTable.remove(key);
+ }
+
+ log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
+ }
+
+ public void checkSelf() {
+ dLegerFileList.checkSelf();
+ }
+
+ public long lockTimeMills() {
+ long diff = 0;
+ long begin = this.beginTimeInLock;
+ if (begin > 0) {
+ diff = this.defaultMessageStore.now() - begin;
+ }
+
+ if (diff < 0) {
+ diff = 0;
+ }
+
+ return diff;
+ }
+
+ class EncodeResult {
+ private String queueOffsetKey;
+ private byte[] data;
+ private AppendMessageStatus status;
+ public EncodeResult(AppendMessageStatus status, byte[] data, String queueOffsetKey) {
+ this.data = data;
+ this.status = status;
+ this.queueOffsetKey = queueOffsetKey;
+ }
+ }
+
+ class MessageSerializer {
+ // File at the end of the minimum fixed length empty
+ private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
+ private final ByteBuffer msgIdMemory;
+ // Store the message content
+ private final ByteBuffer msgStoreItemMemory;
+ // The maximum length of the message
+ private final int maxMessageSize;
+ // Build Message Key
+ private final StringBuilder keyBuilder = new StringBuilder();
+
+ private final StringBuilder msgIdBuilder = new StringBuilder();
+
+ private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
+
+ MessageSerializer(final int size) {
+ this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
+ this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
+ this.maxMessageSize = size;
+ }
+
+ public ByteBuffer getMsgStoreItemMemory() {
+ return msgStoreItemMemory;
+ }
+
+ public EncodeResult serialize(final MessageExtBrokerInner msgInner) {
+ // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
+
+ // PHY OFFSET
+ long wroteOffset = 0;
+
+ this.resetByteBuffer(hostHolder, 8);
+ // Record ConsumeQueue information
+ keyBuilder.setLength(0);
+ keyBuilder.append(msgInner.getTopic());
+ keyBuilder.append('-');
+ keyBuilder.append(msgInner.getQueueId());
+ String key = keyBuilder.toString();
+
+ Long queueOffset = DLegerCommitLog.this.topicQueueTable.get(key);
+ if (null == queueOffset) {
+ queueOffset = 0L;
+ DLegerCommitLog.this.topicQueueTable.put(key, queueOffset);
+ }
+
+ // Transaction messages that require special handling
+ final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
+ switch (tranType) {
+ // Prepared and Rollback message is not consumed, will not enter the
+ // consumer queuec
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ queueOffset = 0L;
+ break;
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ default:
+ break;
+ }
+
+ /**
+ * Serialize message
+ */
+ final byte[] propertiesData =
+ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+ final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
+
+ if (propertiesLength > Short.MAX_VALUE) {
+ log.warn("putMessage message properties length too long. length={}", propertiesData.length);
+ return new EncodeResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED, null, key);
+ }
+
+ final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+ final int topicLength = topicData.length;
+
+ final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
+
+ final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
+
+ // Exceeds the maximum message
+ if (msgLen > this.maxMessageSize) {
+ DLegerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ + ", maxMessageSize: " + this.maxMessageSize);
+ return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key);
+ }
+ // Initialization of storage space
+ this.resetByteBuffer(msgStoreItemMemory, msgLen);
+ // 1 TOTALSIZE
+ this.msgStoreItemMemory.putInt(msgLen);
+ // 2 MAGICCODE
+ this.msgStoreItemMemory.putInt(DLegerCommitLog.MESSAGE_MAGIC_CODE);
+ // 3 BODYCRC
+ this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
+ // 4 QUEUEID
+ this.msgStoreItemMemory.putInt(msgInner.getQueueId());
+ // 5 FLAG
+ this.msgStoreItemMemory.putInt(msgInner.getFlag());
+ // 6 QUEUEOFFSET
+ this.msgStoreItemMemory.putLong(queueOffset);
+ // 7 PHYSICALOFFSET
+ this.msgStoreItemMemory.putLong(wroteOffset);
+ // 8 SYSFLAG
+ this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
+ // 9 BORNTIMESTAMP
+ this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
+ // 10 BORNHOST
+ this.resetByteBuffer(hostHolder, 8);
+ this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
+ // 11 STORETIMESTAMP
+ this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
+ // 12 STOREHOSTADDRESS
+ this.resetByteBuffer(hostHolder, 8);
+ this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
+ //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
+ // 13 RECONSUMETIMES
+ this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
+ // 14 Prepared Transaction Offset
+ this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
+ // 15 BODY
+ this.msgStoreItemMemory.putInt(bodyLength);
+ if (bodyLength > 0)
+ this.msgStoreItemMemory.put(msgInner.getBody());
+ // 16 TOPIC
+ this.msgStoreItemMemory.put((byte) topicLength);
+ this.msgStoreItemMemory.put(topicData);
+ // 17 PROPERTIES
+ this.msgStoreItemMemory.putShort((short) propertiesLength);
+ if (propertiesLength > 0)
+ this.msgStoreItemMemory.put(propertiesData);
+
+ final long beginTimeMills = DLegerCommitLog.this.defaultMessageStore.now();
+
+ byte[] data = new byte[msgLen];
+ this.msgStoreItemMemory.clear();
+ this.msgStoreItemMemory.get(data);
+ return new EncodeResult(AppendMessageStatus.PUT_OK, data, key);
+ }
+
+ private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
+ byteBuffer.flip();
+ byteBuffer.limit(limit);
+ }
+
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerSelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerSelectMappedBufferResult.java
new file mode 100644
index 0000000000000000000000000000000000000000..2d21af4a57d22d57b9d473dc2eadf74802c885e6
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerSelectMappedBufferResult.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.dleger;
+
+import org.apache.rocketmq.dleger.store.file.SelectMmapBufferResult;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+
+public class DLegerSelectMappedBufferResult extends SelectMappedBufferResult {
+
+ private SelectMmapBufferResult sbr;
+ public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
+ super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null);
+ this.sbr = sbr;
+ }
+
+ @Override
+ public synchronized void release() {
+ super.release();
+ sbr.release();
+ }
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java
new file mode 100644
index 0000000000000000000000000000000000000000..636386754e5a875a112a35775c7bc4f612ccb9ae
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java
@@ -0,0 +1,69 @@
+package org.apache.rocketmq.store;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.UtilAll;
+import org.junit.After;
+
+public class StoreTestBase {
+
+ private int QUEUE_TOTAL = 100;
+ private AtomicInteger QueueId = new AtomicInteger(0);
+ private SocketAddress BornHost = new InetSocketAddress("127.0.0.1", 8123);
+ private SocketAddress StoreHost = BornHost;
+ private byte[] MessageBody = new byte[1024];
+
+ protected Set baseDirs = new HashSet<>();
+
+ private AtomicInteger port = new AtomicInteger(30000);
+
+ public int nextPort() {
+ return port.incrementAndGet();
+ }
+
+ protected MessageExtBrokerInner buildMessage() {
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setTopic("StoreTest");
+ msg.setTags("TAG1");
+ msg.setKeys("Hello");
+ msg.setBody(MessageBody);
+ msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+ msg.setSysFlag(0);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(StoreHost);
+ msg.setBornHost(BornHost);
+ return msg;
+ }
+
+ public static String createBaseDir() {
+ String baseDir = System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + UUID.randomUUID();
+ final File file = new File(baseDir);
+ if (file.exists()) {
+ System.exit(1);
+ }
+ return baseDir;
+ }
+
+ public static void deleteFile(String fileName) {
+ deleteFile(new File(fileName));
+ }
+
+ public static void deleteFile(File file) {
+ UtilAll.deleteFile(file);
+ }
+
+ @After
+ public void clear() {
+ for (String baseDir : baseDirs) {
+ deleteFile(baseDir);
+ }
+ }
+
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..8ad58c8f403001bd49df8e4ef0213c2a6e00ad24
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java
@@ -0,0 +1,93 @@
+package org.apache.rocketmq.store.dleger;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.StoreTestBase;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DLegerCommitlogTest extends StoreTestBase {
+
+ private DefaultMessageStore createMessageStore(String base) throws Exception {
+ baseDirs.add(base);
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ storeConfig.setMapedFileSizeCommitLog(1024 * 100);
+ storeConfig.setMapedFileSizeConsumeQueue(1024);
+ storeConfig.setMaxHashSlotNum(100);
+ storeConfig.setMaxIndexNum(100 * 10);
+ storeConfig.setStorePathRootDir(base);
+ storeConfig.setStorePathCommitLog(base + File.separator + "commitlog");
+ storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
+
+ storeConfig.setEnableDLegerCommitLog(true);
+ storeConfig.setdLegerGroup(UUID.randomUUID().toString());
+ storeConfig.setdLegerPeers(String.format("n0-localhost:%d", nextPort()));
+ storeConfig.setdLegerSelfId("n0");
+ DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLegerCommitlogTest"), new MessageArrivingListener() {
+
+ @Override
+ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
+ byte[] filterBitMap, Map properties) {
+
+ }
+ }, new BrokerConfig());
+ defaultMessageStore.load();
+ defaultMessageStore.start();
+ return defaultMessageStore;
+ }
+
+ @Test
+ public void testPutAndGetMessage() throws Exception {
+ String base = createBaseDir();
+ DefaultMessageStore messageStore = createMessageStore(base);
+ Thread.sleep(1000);
+ String topic = UUID.randomUUID().toString();
+
+ List results = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ MessageExtBrokerInner msgInner = buildMessage();
+ msgInner.setTopic(topic);
+ msgInner.setQueueId(0);
+ PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
+ results.add(putMessageResult);
+ Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+ Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset());
+ }
+ Thread.sleep(100);
+ Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+ Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
+ Assert.assertEquals(0, messageStore.dispatchBehindBytes());
+ GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
+ Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
+
+ Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
+ Assert.assertEquals(10, getMessageResult.getMessageMapedList().size());
+
+ for (int i = 0; i < results.size(); i++) {
+ ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i);
+ MessageExt messageExt = MessageDecoder.decode(buffer);
+ Assert.assertEquals(i, messageExt.getQueueOffset());
+ Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId());
+ Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
+ }
+ }
+
+
+}