提交 515bc353 编写于 作者: D dongeforever

Add dleger commitlog with tests

上级 0f153b9d
...@@ -41,6 +41,7 @@ public class MessageDecoder { ...@@ -41,6 +41,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE = -626843481; public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1; public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2; public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE + 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC + 4 // 3 BODYCRC
......
...@@ -100,8 +100,8 @@ ...@@ -100,8 +100,8 @@
<maven.test.skip>false</maven.test.skip> <maven.test.skip>false</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip> <maven.javadoc.skip>true</maven.javadoc.skip>
<!-- Compiler settings properties --> <!-- Compiler settings properties -->
<maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin> <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<!-- Exclude all generated code --> <!-- Exclude all generated code -->
<sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath> <sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
......
...@@ -28,6 +28,21 @@ ...@@ -28,6 +28,21 @@
<name>rocketmq-store ${project.version}</name> <name>rocketmq-store ${project.version}</name>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-dleger</artifactId>
<version>0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId> <artifactId>rocketmq-common</artifactId>
......
...@@ -45,11 +45,11 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService; ...@@ -45,11 +45,11 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
public class CommitLog { public class CommitLog {
// Message's MAGIC CODE daa320a7 // Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481; public final static int MESSAGE_MAGIC_CODE = -626843481;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194 // End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = -875286124; protected final static int BLANK_MAGIC_CODE = -875286124;
private final MappedFileQueue mappedFileQueue; private final MappedFileQueue mappedFileQueue;
private final DefaultMessageStore defaultMessageStore; protected final DefaultMessageStore defaultMessageStore;
private final FlushCommitLogService flushCommitLogService; private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
...@@ -57,11 +57,11 @@ public class CommitLog { ...@@ -57,11 +57,11 @@ public class CommitLog {
private final AppendMessageCallback appendMessageCallback; private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal; private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024); protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
private volatile long confirmOffset = -1L; protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0; protected volatile long beginTimeInLock = 0;
private final PutMessageLock putMessageLock; protected final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) { public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
...@@ -366,7 +366,7 @@ public class CommitLog { ...@@ -366,7 +366,7 @@ public class CommitLog {
return new DispatchRequest(-1, false /* success */); return new DispatchRequest(-1, false /* success */);
} }
private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 //TOTALSIZE final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE + 4 //MAGICCODE
+ 4 //BODYCRC + 4 //BODYCRC
......
...@@ -348,7 +348,7 @@ public class ConsumeQueue { ...@@ -348,7 +348,7 @@ public class ConsumeQueue {
long tagsCode = result.getByteBuffer().getLong(); long tagsCode = result.getByteBuffer().getLong();
if (offsetPy >= phyMinOffset) { if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i; this.minLogicOffset = mappedFile.getFileFromOffset() + i;
log.info("Compute logical min offset: {}, topic: {}, queueId: {}", log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
this.getMinOffsetInQueue(), this.topic, this.queueId); this.getMinOffsetInQueue(), this.topic, this.queueId);
// This maybe not take effect, when not every consume queue has extend file. // This maybe not take effect, when not every consume queue has extend file.
......
...@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; ...@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.dleger.DLegerCommitLog;
import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.index.IndexService; import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.index.QueryOffsetResult; import org.apache.rocketmq.store.index.QueryOffsetResult;
...@@ -119,7 +120,11 @@ public class DefaultMessageStore implements MessageStore { ...@@ -119,7 +120,11 @@ public class DefaultMessageStore implements MessageStore {
this.messageStoreConfig = messageStoreConfig; this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager; this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this); this.allocateMappedFileService = new AllocateMappedFileService(this);
this.commitLog = new CommitLog(this); if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLegerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32); this.consumeQueueTable = new ConcurrentHashMap<>(32);
this.flushConsumeQueueService = new FlushConsumeQueueService(); this.flushConsumeQueueService = new FlushConsumeQueueService();
...@@ -1763,7 +1768,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1763,7 +1768,7 @@ public class DefaultMessageStore implements MessageStore {
for (int readSize = 0; readSize < result.getSize() && doNext; ) { for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest = DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize(); int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) { if (dispatchRequest.isSuccess()) {
if (size > 0) { if (size > 0) {
......
...@@ -22,7 +22,7 @@ public class DispatchRequest { ...@@ -22,7 +22,7 @@ public class DispatchRequest {
private final String topic; private final String topic;
private final int queueId; private final int queueId;
private final long commitLogOffset; private final long commitLogOffset;
private final int msgSize; private int msgSize;
private final long tagsCode; private final long tagsCode;
private final long storeTimestamp; private final long storeTimestamp;
private final long consumeQueueOffset; private final long consumeQueueOffset;
...@@ -35,6 +35,8 @@ public class DispatchRequest { ...@@ -35,6 +35,8 @@ public class DispatchRequest {
private final Map<String, String> propertiesMap; private final Map<String, String> propertiesMap;
private byte[] bitMap; private byte[] bitMap;
private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something
public DispatchRequest( public DispatchRequest(
final String topic, final String topic,
final int queueId, final int queueId,
...@@ -156,4 +158,16 @@ public class DispatchRequest { ...@@ -156,4 +158,16 @@ public class DispatchRequest {
public void setBitMap(byte[] bitMap) { public void setBitMap(byte[] bitMap) {
this.bitMap = bitMap; this.bitMap = bitMap;
} }
public void setMsgSize(int msgSize) {
this.msgSize = msgSize;
}
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
} }
...@@ -48,9 +48,9 @@ public class SelectMappedBufferResult { ...@@ -48,9 +48,9 @@ public class SelectMappedBufferResult {
this.byteBuffer.limit(this.size); this.byteBuffer.limit(this.size);
} }
public MappedFile getMappedFile() { /* public MappedFile getMappedFile() {
return mappedFile; return mappedFile;
} }*/
// @Override // @Override
// protected void finalize() { // protected void finalize() {
......
...@@ -143,6 +143,11 @@ public class MessageStoreConfig { ...@@ -143,6 +143,11 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5; private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false; private boolean fastFailIfNoBufferInStorePool = false;
private boolean enableDLegerCommitLog;
private String dLegerGroup;
private String dLegerPeers;
private String dLegerSelfId;
public boolean isDebugLockEnable() { public boolean isDebugLockEnable() {
return debugLockEnable; return debugLockEnable;
} }
...@@ -666,4 +671,35 @@ public class MessageStoreConfig { ...@@ -666,4 +671,35 @@ public class MessageStoreConfig {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
} }
public String getdLegerGroup() {
return dLegerGroup;
}
public void setdLegerGroup(String dLegerGroup) {
this.dLegerGroup = dLegerGroup;
}
public String getdLegerPeers() {
return dLegerPeers;
}
public void setdLegerPeers(String dLegerPeers) {
this.dLegerPeers = dLegerPeers;
}
public String getdLegerSelfId() {
return dLegerSelfId;
}
public void setdLegerSelfId(String dLegerSelfId) {
this.dLegerSelfId = dLegerSelfId;
}
public boolean isEnableDLegerCommitLog() {
return enableDLegerCommitLog;
}
public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) {
this.enableDLegerCommitLog = enableDLegerCommitLog;
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.dleger;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.dleger.DLegerConfig;
import org.apache.rocketmq.dleger.DLegerServer;
import org.apache.rocketmq.dleger.entry.DLegerEntry;
import org.apache.rocketmq.dleger.protocol.AppendEntryRequest;
import org.apache.rocketmq.dleger.protocol.AppendEntryResponse;
import org.apache.rocketmq.dleger.protocol.DLegerResponseCode;
import org.apache.rocketmq.dleger.store.file.DLegerMmapFileStore;
import org.apache.rocketmq.dleger.store.file.MmapFile;
import org.apache.rocketmq.dleger.store.file.MmapFileList;
import org.apache.rocketmq.dleger.store.file.SelectMmapBufferResult;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.StoreStatsService;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
/**
* Store all metadata downtime for recovery, data protection reliability
*/
public class DLegerCommitLog extends CommitLog {
private final DLegerServer dLegerServer;
private final DLegerMmapFileStore dLegerFileStore;
private final MmapFileList dLegerFileList;
private final MessageSerializer messageSerializer;
private volatile long beginTimeInLock = 0;
public DLegerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore);
DLegerConfig dLegerConfig = new DLegerConfig();
dLegerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
dLegerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
dLegerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
dLegerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
dLegerServer = new DLegerServer(dLegerConfig);
dLegerFileStore = (DLegerMmapFileStore) dLegerServer.getdLegerStore();
DLegerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
assert bodyOffset == DLegerEntry.BODY_OFFSET;
buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
buffer.putLong(entry.getPos() + bodyOffset);
};
dLegerFileStore.addAppendHook(appendHook);
dLegerFileList = dLegerFileStore.getDataFileList();
this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
public boolean load() {
/*boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;*/
return true;
}
public void start() {
dLegerServer.startup();
/* this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}*/
}
public void shutdown() {
/* if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.shutdown();
}
this.flushCommitLogService.shutdown();*/
dLegerServer.shutdown();
}
public long flush() {
dLegerFileStore.flush();
return dLegerFileList.getFlushedWhere();
}
public long getMaxOffset() {
return this.dLegerFileList.getMaxWrotePosition();
}
public long remainHowManyDataToCommit() {
return dLegerFileList.remainHowManyDataToCommit();
}
public long remainHowManyDataToFlush() {
return dLegerFileList.remainHowManyDataToFlush();
}
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
return 0;
}
/**
* Read CommitLog data, use data replication
*/
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
private static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult {
private SelectMmapBufferResult sbr;
public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null);
this.sbr = sbr;
}
public synchronized void release() {
super.release();
sbr.release();
}
}
public SelectMappedBufferResult convertSbr(SelectMmapBufferResult sbr) {
if (sbr == null) {
return null;
} else {
return new DLegerSelectMappedBufferResult(sbr);
}
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
SelectMmapBufferResult sbr = mappedFile.selectMappedBuffer(pos);
return convertSbr(sbr);
}
return null;
}
/**
* When the normal exit, data recovery, all memory data have been flush
*/
public void recoverNormally() {
}
public void recoverAbnormally() {
}
public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC) {
return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
}
private void doNothingForDeadCode(final Object obj) {
if (obj != null) {
log.debug(String.valueOf(obj.hashCode()));
}
}
/**
* check the message and returns the message size
*
* @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
*/
public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) {
try {
int bodyOffset = DLegerEntry.BODY_OFFSET;
byteBuffer.position(byteBuffer.position() + bodyOffset);
DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody);
if (dispatchRequest.isSuccess()) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
} else if (dispatchRequest.getMsgSize() > 0) {
dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset);
}
return dispatchRequest;
} catch (Exception e) {
}
return new DispatchRequest(-1, false /* success */);
}
public long getConfirmOffset() {
return this.confirmOffset;
}
public void setConfirmOffset(long phyOffset) {
this.confirmOffset = phyOffset;
}
private void notifyMessageArriving() {
}
public boolean resetOffset(long offset) {
//return this.mappedFileQueue.resetOffset(offset);
return false;
}
public long getBeginTimeInLock() {
return beginTimeInLock;
}
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
//should be consistent with the old version
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// Back to Results
AppendMessageResult appendResult = null;
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
CompletableFuture<AppendEntryResponse> dlegerFuture = null;
EncodeResult encodeResult = null;
long eclipseTimeInLock = 0L;
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long queueOffset = -1;
try {
beginTimeInLock = this.defaultMessageStore.getSystemClock().now();
//TO DO use buffer
encodeResult = this.messageSerializer.serialize(msg);
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
appendResult = new AppendMessageResult(encodeResult.status);
switch (encodeResult.status) {
case PROPERTIES_SIZE_EXCEEDED:
case MESSAGE_SIZE_EXCEEDED:
putMessageStatus = PutMessageStatus.MESSAGE_ILLEGAL;
break;
}
} else {
AppendEntryRequest request = new AppendEntryRequest();
request.setRemoteId(dLegerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dlegerFuture = dLegerServer.handleAppend(request);
if (dlegerFuture.isDone() && dlegerFuture.get().getCode() != DLegerResponseCode.SUCCESS.getCode()) {
//TO DO make sure the local store is ok
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} else {
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
DLegerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
break;
default:
break;
}
}
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInLock;
} catch (Exception e) {
log.error("Put message error", e);
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} finally {
putMessageLock.unlock();
}
if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, appendResult);
}
if (dlegerFuture != null) {
try {
AppendEntryResponse appendEntryResponse = dlegerFuture.get(3, TimeUnit.SECONDS);
switch (DLegerResponseCode.valueOf(appendEntryResponse.getCode())) {
case SUCCESS:
putMessageStatus = PutMessageStatus.PUT_OK;
long wroteOffset = appendEntryResponse.getPos() + DLegerEntry.BODY_OFFSET;
ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, 0);
break;
case INCONSISTENT_LEADER:
case NOT_LEADER:
case NOT_READY:
case DISK_FULL:
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
}
} catch (Exception ignored) {
putMessageStatus = PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
}
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes());
}
return putMessageResult;
}
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
}
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
}
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
/**
* According to receive certain message or offset storage time if an error occurs, it returns -1
*/
public long pickupStoreTimestamp(final long offset, final int size) {
if (offset >= this.getMinOffset()) {
SelectMappedBufferResult result = this.getMessage(offset, size);
if (null != result) {
try {
return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
} finally {
result.release();
}
}
}
return -1;
}
public long getMinOffset() {
return dLegerFileList.getMinOffset();
}
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
return convertSbr(mappedFile.selectMappedBuffer(pos, size));
}
return null;
}
public long rollNextFile(final long offset) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
return offset + mappedFileSize - offset % mappedFileSize;
}
public HashMap<String, Long> getTopicQueueTable() {
return topicQueueTable;
}
public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
this.topicQueueTable = topicQueueTable;
}
public void destroy() {
//TO DO
}
public boolean appendData(long startOffset, byte[] data) {
//TO DO
return false;
}
public boolean retryDeleteFirstFile(final long intervalForcibly) {
//TO DO
return false;
}
public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
String key = topic + "-" + queueId;
synchronized (this) {
this.topicQueueTable.remove(key);
}
log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
}
public void checkSelf() {
dLegerFileList.checkSelf();
}
public long lockTimeMills() {
long diff = 0;
long begin = this.beginTimeInLock;
if (begin > 0) {
diff = this.defaultMessageStore.now() - begin;
}
if (diff < 0) {
diff = 0;
}
return diff;
}
class EncodeResult {
private String queueOffsetKey;
private byte[] data;
private AppendMessageStatus status;
public EncodeResult(AppendMessageStatus status, byte[] data, String queueOffsetKey) {
this.data = data;
this.status = status;
this.queueOffsetKey = queueOffsetKey;
}
}
class MessageSerializer {
// File at the end of the minimum fixed length empty
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
private final ByteBuffer msgIdMemory;
// Store the message content
private final ByteBuffer msgStoreItemMemory;
// The maximum length of the message
private final int maxMessageSize;
// Build Message Key
private final StringBuilder keyBuilder = new StringBuilder();
private final StringBuilder msgIdBuilder = new StringBuilder();
private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
MessageSerializer(final int size) {
this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
this.maxMessageSize = size;
}
public ByteBuffer getMsgStoreItemMemory() {
return msgStoreItemMemory;
}
public EncodeResult serialize(final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = 0;
this.resetByteBuffer(hostHolder, 8);
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = DLegerCommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
DLegerCommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new EncodeResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED, null, key);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
DLegerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key);
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(DLegerCommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(wroteOffset);
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = DLegerCommitLog.this.defaultMessageStore.now();
byte[] data = new byte[msgLen];
this.msgStoreItemMemory.clear();
this.msgStoreItemMemory.get(data);
return new EncodeResult(AppendMessageStatus.PUT_OK, data, key);
}
private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
byteBuffer.flip();
byteBuffer.limit(limit);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.dleger;
import org.apache.rocketmq.dleger.store.file.SelectMmapBufferResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
public class DLegerSelectMappedBufferResult extends SelectMappedBufferResult {
private SelectMmapBufferResult sbr;
public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null);
this.sbr = sbr;
}
@Override
public synchronized void release() {
super.release();
sbr.release();
}
}
package org.apache.rocketmq.store;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.UtilAll;
import org.junit.After;
public class StoreTestBase {
private int QUEUE_TOTAL = 100;
private AtomicInteger QueueId = new AtomicInteger(0);
private SocketAddress BornHost = new InetSocketAddress("127.0.0.1", 8123);
private SocketAddress StoreHost = BornHost;
private byte[] MessageBody = new byte[1024];
protected Set<String> baseDirs = new HashSet<>();
private AtomicInteger port = new AtomicInteger(30000);
public int nextPort() {
return port.incrementAndGet();
}
protected MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("StoreTest");
msg.setTags("TAG1");
msg.setKeys("Hello");
msg.setBody(MessageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
return msg;
}
public static String createBaseDir() {
String baseDir = System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + UUID.randomUUID();
final File file = new File(baseDir);
if (file.exists()) {
System.exit(1);
}
return baseDir;
}
public static void deleteFile(String fileName) {
deleteFile(new File(fileName));
}
public static void deleteFile(File file) {
UtilAll.deleteFile(file);
}
@After
public void clear() {
for (String baseDir : baseDirs) {
deleteFile(baseDir);
}
}
}
package org.apache.rocketmq.store.dleger;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
import org.junit.Test;
public class DLegerCommitlogTest extends StoreTestBase {
private DefaultMessageStore createMessageStore(String base) throws Exception {
baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100);
storeConfig.setMapedFileSizeConsumeQueue(1024);
storeConfig.setMaxHashSlotNum(100);
storeConfig.setMaxIndexNum(100 * 10);
storeConfig.setStorePathRootDir(base);
storeConfig.setStorePathCommitLog(base + File.separator + "commitlog");
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(UUID.randomUUID().toString());
storeConfig.setdLegerPeers(String.format("n0-localhost:%d", nextPort()));
storeConfig.setdLegerSelfId("n0");
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLegerCommitlogTest"), new MessageArrivingListener() {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
byte[] filterBitMap, Map<String, String> properties) {
}
}, new BrokerConfig());
defaultMessageStore.load();
defaultMessageStore.start();
return defaultMessageStore;
}
@Test
public void testPutAndGetMessage() throws Exception {
String base = createBaseDir();
DefaultMessageStore messageStore = createMessageStore(base);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
List<PutMessageResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
results.add(putMessageResult);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
Assert.assertEquals(10, getMessageResult.getMessageMapedList().size());
for (int i = 0; i < results.size(); i++) {
ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i);
MessageExt messageExt = MessageDecoder.decode(buffer);
Assert.assertEquals(i, messageExt.getQueueOffset());
Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId());
Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册