提交 f8634b4b 编写于 作者: D dongeforever

Divide old commitlog and dledger commitlog

上级 d30d0e85
......@@ -237,7 +237,7 @@ public class BrokerController {
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLegerServer().getdLegerLeaderElector().addRoleChangeHandler(roleChangeHandler);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLegerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
......
......@@ -43,7 +43,7 @@ public class DLedgerRoleChangeHandler implements DLegerLeaderElector.RoleChangeH
this.brokerController = brokerController;
this.messageStore = messageStore;
this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
this.dLegerServer = dLedgerCommitLog.getdLegerServer();
this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
}
@Override public void handle(long term, MemberState.Role role) {
......
......@@ -48,7 +48,7 @@ public class CommitLog {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
protected final static int BLANK_MAGIC_CODE = -875286124;
private final MappedFileQueue mappedFileQueue;
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
private final FlushCommitLogService flushCommitLogService;
......
......@@ -41,6 +41,7 @@ import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
......@@ -52,90 +53,146 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
* Store all metadata downtime for recovery, data protection reliability
*/
public class DLedgerCommitLog extends CommitLog {
private final DLegerServer dLegerServer;
private final DLegerConfig dLegerConfig;
private final DLegerMmapFileStore dLegerFileStore;
private final MmapFileList dLegerFileList;
private final DLegerServer dLedgerServer;
private final DLegerConfig dLedgerConfig;
private final DLegerMmapFileStore dLedgerFileStore;
private final MmapFileList dLedgerFileList;
//The id identifies the broker role, 0 means master, others means slave
private final int id;
private final MessageSerializer messageSerializer;
private volatile long beginTimeInDledgerLock = 0;
//This offset separate the old commitlog from dledger commitlog
private long dividedCommitlogOffset = -1;
private final MessageSerializer messageSerializer;
private volatile long beginTimeInDlegerLock = 0;
//The old commitlog should be deleted before the dledger commitlog
private final boolean originalDledgerEnableForceClean;
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore);
dLegerConfig = new DLegerConfig();
dLegerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
dLegerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
dLegerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
dLegerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
id = Integer.valueOf(dLegerConfig.getSelfId().substring(1)) + 1;
dLegerServer = new DLegerServer(dLegerConfig);
dLegerFileStore = (DLegerMmapFileStore) dLegerServer.getdLegerStore();
dLedgerConfig = new DLegerConfig();
dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
originalDledgerEnableForceClean = dLedgerConfig.isEnableDiskForceClean();
id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;
dLedgerServer = new DLegerServer(dLedgerConfig);
dLedgerFileStore = (DLegerMmapFileStore) dLedgerServer.getdLegerStore();
DLegerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
assert bodyOffset == DLegerEntry.BODY_OFFSET;
buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
buffer.putLong(entry.getPos() + bodyOffset);
};
dLegerFileStore.addAppendHook(appendHook);
dLegerFileList = dLegerFileStore.getDataFileList();
dLedgerFileStore.addAppendHook(appendHook);
dLedgerFileList = dLedgerFileStore.getDataFileList();
this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
@Override
public boolean load() {
/*boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;*/
boolean result = super.load();
if(!result) {
return false;
}
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {
return true;
}
dLedgerConfig.setEnableDiskForceClean(false);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
int currentPos = 0;
boolean needWriteMagicCode = true;
while (true) {
byteBuffer.position(currentPos);
// 1 TOTAL SIZE
int totalSize = byteBuffer.getInt();
int magicCode = byteBuffer.getInt();
if (magicCode == BLANK_MAGIC_CODE) {
needWriteMagicCode = false;
break;
}
if (magicCode != MESSAGE_MAGIC_CODE) {
log.info("Recover old commitlog found a illegal magic code={}", magicCode);
break;
}
currentPos = currentPos + totalSize;
}
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={}", needWriteMagicCode, currentPos, mappedFile.getFileName());
if (needWriteMagicCode) {
byteBuffer.position(currentPos);
byteBuffer.putInt(mappedFile.getFileSize() - currentPos);
byteBuffer.putInt(BLANK_MAGIC_CODE);
mappedFile.flush(0);
}
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
if (dLedgerFileList.getMappedFiles().isEmpty()) {
log.info("Recover to set the initial offset the dledger commitlog dividedCommitlogOffset={}", dividedCommitlogOffset);
dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
}
return true;
}
@Override
public void start() {
dLegerServer.startup();
/* this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}*/
dLedgerServer.startup();
}
@Override
public void shutdown() {
/* if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.shutdown();
}
this.flushCommitLogService.shutdown();*/
dLegerServer.shutdown();
dLedgerServer.shutdown();
}
@Override
public long flush() {
dLegerFileStore.flush();
return dLegerFileList.getFlushedWhere();
dLedgerFileStore.flush();
return dLedgerFileList.getFlushedWhere();
}
@Override
public long getMaxOffset() {
if (this.dLegerFileStore.getCommittedPos() != -1) {
return dLegerFileStore.getCommittedPos();
if (this.dLedgerFileStore.getCommittedPos() != -1) {
return dLedgerFileStore.getCommittedPos();
} else {
return this.dLegerFileList.getMaxWrotePosition();
return this.dLedgerFileList.getMaxWrotePosition();
}
}
@Override
public long getMinOffset() {
if (mappedFileQueue.getMappedFiles().isEmpty()) {
return dLedgerFileList.getMinOffset();
}
return mappedFileQueue.getMinOffset();
}
@Override
public long getConfirmOffset() {
return this.dLedgerFileStore.getCommittedPos() == -1 ? getMaxOffset()
: this.dLedgerFileStore.getCommittedPos();
}
@Override
public void setConfirmOffset(long phyOffset) {
log.warn("Should not set confirm offset {} for dleger commitlog", phyOffset);
}
@Override
public long remainHowManyDataToCommit() {
return dLegerFileList.remainHowManyDataToCommit();
return dLedgerFileList.remainHowManyDataToCommit();
}
@Override
public long remainHowManyDataToFlush() {
return dLegerFileList.remainHowManyDataToFlush();
return dLedgerFileList.remainHowManyDataToFlush();
}
@Override
......@@ -145,25 +202,15 @@ public class DLedgerCommitLog extends CommitLog {
final long intervalForcibly,
final boolean cleanImmediately
) {
return 0;
}
static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult {
private SelectMmapBufferResult sbr;
public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null);
this.sbr = sbr;
}
public synchronized void release() {
super.release();
sbr.release();
int count = super.deleteExpiredFile(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
if (mappedFileQueue.getMappedFiles().isEmpty()) {
dLedgerConfig.setEnableDiskForceClean(originalDledgerEnableForceClean);
}
//return 1 to prevent too much log in defaultMessageStore
return count > 0 ? count : 1;
}
public SelectMappedBufferResult convertSbr(SelectMmapBufferResult sbr) {
if (sbr == null) {
return null;
......@@ -174,7 +221,7 @@ public class DLedgerCommitLog extends CommitLog {
}
public SelectMmapBufferResult truncate(SelectMmapBufferResult sbr) {
long committedPos = dLegerFileStore.getCommittedPos();
long committedPos = dLedgerFileStore.getCommittedPos();
if (sbr == null || sbr.getStartOffset() == committedPos) {
return null;
}
......@@ -186,22 +233,25 @@ public class DLedgerCommitLog extends CommitLog {
}
}
/**
* Read CommitLog data, use data replication
*/
@Override
public SelectMappedBufferResult getData(final long offset) {
if (offset < dividedCommitlogOffset) {
return super.getData(offset);
}
return this.getData(offset, offset == 0);
}
@Override
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
if (offset >= dLegerFileStore.getCommittedPos()) {
if (offset < dividedCommitlogOffset) {
return super.getData(offset, returnFirstOnNotFound);
}
if (offset >= dLedgerFileStore.getCommittedPos()) {
return null;
}
int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound);
int mappedFileSize = this.dLedgerServer.getdLegerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
SelectMmapBufferResult sbr = mappedFile.selectMappedBuffer(pos);
......@@ -211,17 +261,18 @@ public class DLedgerCommitLog extends CommitLog {
return null;
}
/**
* When the normal exit, data recovery, all memory data have been flush
*/
@Override
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
if (dLedgerFileList.getMappedFiles().isEmpty()) {
super.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
}
@Override
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
if (dLedgerFileList.getMappedFiles().isEmpty()) {
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
}
}
@Override
......@@ -229,11 +280,6 @@ public class DLedgerCommitLog extends CommitLog {
return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
}
/**
* check the message and returns the message size
*
* @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
*/
@Override
public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) {
......@@ -259,25 +305,15 @@ public class DLedgerCommitLog extends CommitLog {
return new DispatchRequest(-1, false /* success */);
}
@Override
public long getConfirmOffset() {
return this.dLegerFileStore.getCommittedPos() == -1 ? getMaxOffset()
: this.dLegerFileStore.getCommittedPos();
}
@Override
public void setConfirmOffset(long phyOffset) {
log.warn("Should not set confirm offset {} for dleger commitlog", phyOffset);
}
@Override
public boolean resetOffset(long offset) {
//currently, it seems resetOffset has no use
return false;
}
@Override
public long getBeginTimeInLock() {
return beginTimeInDlegerLock;
return beginTimeInDledgerLock;
}
@Override
......@@ -325,7 +361,7 @@ public class DLedgerCommitLog extends CommitLog {
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long queueOffset = -1;
try {
beginTimeInDlegerLock = this.defaultMessageStore.getSystemClock().now();
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
//TO DO use buffer
encodeResult = this.messageSerializer.serialize(msg);
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
......@@ -339,10 +375,10 @@ public class DLedgerCommitLog extends CommitLog {
}
} else {
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLegerConfig.getGroup());
request.setRemoteId(dLegerServer.getMemberState().getSelfId());
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dlegerFuture = dLegerServer.handleAppend(request);
dlegerFuture = dLedgerServer.handleAppend(request);
if (dlegerFuture.isDone() && dlegerFuture.get().getCode() != DLegerResponseCode.SUCCESS.getCode()) {
//TO DO make sure the local store is ok
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
......@@ -361,12 +397,12 @@ public class DLedgerCommitLog extends CommitLog {
}
}
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDlegerLock;
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
} catch (Exception e) {
log.error("Put message error", e);
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} finally {
beginTimeInDlegerLock = 0;
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}
......@@ -419,17 +455,14 @@ public class DLedgerCommitLog extends CommitLog {
}
@Override
public long getMinOffset() {
return dLegerFileList.getMinOffset();
}
@Override
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, offset == 0);
if (offset < dividedCommitlogOffset) {
return getMessage(offset, size);
}
int mappedFileSize = this.dLedgerServer.getdLegerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
return convertSbr(mappedFile.selectMappedBuffer(pos, size));
......@@ -455,30 +488,25 @@ public class DLedgerCommitLog extends CommitLog {
@Override
public void destroy() {
//TO DO
super.destroy();
dLedgerFileList.destroy();
}
@Override
public boolean appendData(long startOffset, byte[] data) {
//TO DO
//the old ha service will invoke method, here to prevent it
return false;
}
@Override
public boolean retryDeleteFirstFile(final long intervalForcibly) {
return false;
}
@Override
public void checkSelf() {
dLegerFileList.checkSelf();
dLedgerFileList.checkSelf();
}
@Override
public long lockTimeMills() {
long diff = 0;
long begin = this.beginTimeInDlegerLock;
long begin = this.beginTimeInDledgerLock;
if (begin > 0) {
diff = this.defaultMessageStore.now() - begin;
}
......@@ -646,11 +674,30 @@ public class DLedgerCommitLog extends CommitLog {
}
public DLegerServer getdLegerServer() {
return dLegerServer;
static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult {
private SelectMmapBufferResult sbr;
public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null);
this.sbr = sbr;
}
public synchronized void release() {
super.release();
sbr.release();
}
}
public DLegerServer getdLedgerServer() {
return dLedgerServer;
}
public int getId() {
return id;
}
public long getDividedCommitlogOffset() {
return dividedCommitlogOffset;
}
}
......@@ -43,7 +43,7 @@ public class DLedgerCommitlogTest extends StoreTestBase {
}, new BrokerConfig());
if (leaderId != null) {
DLegerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLegerServer();
DLegerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer();
dLegerServer.getdLegerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) {
dLegerServer.getMemberState().changeToLeader(-1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册