提交 58496c2f 编写于 作者: D dongeforever

Rename dleger to dledger

上级 b31c17f9
...@@ -21,24 +21,24 @@ import java.util.concurrent.Executors; ...@@ -21,24 +21,24 @@ import java.util.concurrent.Executors;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import io.openmessaging.storage.dleger.DLegerLeaderElector; import io.openmessaging.storage.dledger.DLedgerLeaderElector;
import io.openmessaging.storage.dleger.DLegerServer; import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dleger.MemberState; import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dleger.utils.UtilAll; import io.openmessaging.storage.dledger.utils.UtilAll;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog; import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
public class DLedgerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler { public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_")); private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));
private BrokerController brokerController; private BrokerController brokerController;
private DefaultMessageStore messageStore; private DefaultMessageStore messageStore;
private DLedgerCommitLog dLedgerCommitLog; private DLedgerCommitLog dLedgerCommitLog;
private DLegerServer dLegerServer; private DLedgerServer dLegerServer;
public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
this.brokerController = brokerController; this.brokerController = brokerController;
this.messageStore = messageStore; this.messageStore = messageStore;
...@@ -63,11 +63,11 @@ public class DLedgerRoleChangeHandler implements DLegerLeaderElector.RoleChangeH ...@@ -63,11 +63,11 @@ public class DLedgerRoleChangeHandler implements DLegerLeaderElector.RoleChangeH
break; break;
case LEADER: case LEADER:
while (dLegerServer.getMemberState().isLeader() while (dLegerServer.getMemberState().isLeader()
&& (dLegerServer.getdLegerStore().getLegerEndIndex() != dLegerServer.getdLegerStore().getCommittedIndex() || messageStore.dispatchBehindBytes() != 0)) { && (dLegerServer.getdLedgerStore().getLedgerEndIndex() != dLegerServer.getdLedgerStore().getCommittedIndex() || messageStore.dispatchBehindBytes() != 0)) {
UtilAll.sleep(100); UtilAll.sleep(100);
} }
boolean succ = dLegerServer.getMemberState().isLeader() boolean succ = dLegerServer.getMemberState().isLeader()
&& dLegerServer.getdLegerStore().getLegerEndIndex() == dLegerServer.getdLegerStore().getCommittedIndex() && dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
&& messageStore.dispatchBehindBytes() == 0; && messageStore.dispatchBehindBytes() == 0;
if (succ) { if (succ) {
messageStore.recoverTopicQueueTable(); messageStore.recoverTopicQueueTable();
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>io.openmessaging.storage</groupId> <groupId>io.openmessaging.storage</groupId>
<artifactId>dleger</artifactId> <artifactId>dledger</artifactId>
<version>0.1-SNAPSHOT</version> <version>0.1-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
......
...@@ -16,16 +16,16 @@ ...@@ -16,16 +16,16 @@
*/ */
package org.apache.rocketmq.store.dledger; package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dleger.DLegerConfig; import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dleger.DLegerServer; import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dleger.entry.DLegerEntry; import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dleger.protocol.AppendEntryRequest; import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dleger.protocol.AppendEntryResponse; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dleger.protocol.DLegerResponseCode; import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dleger.store.file.DLegerMmapFileStore; import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dleger.store.file.MmapFile; import io.openmessaging.storage.dledger.store.file.MmapFile;
import io.openmessaging.storage.dleger.store.file.MmapFileList; import io.openmessaging.storage.dledger.store.file.MmapFileList;
import io.openmessaging.storage.dleger.store.file.SelectMmapBufferResult; import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
...@@ -53,9 +53,9 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService; ...@@ -53,9 +53,9 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
* Store all metadata downtime for recovery, data protection reliability * Store all metadata downtime for recovery, data protection reliability
*/ */
public class DLedgerCommitLog extends CommitLog { public class DLedgerCommitLog extends CommitLog {
private final DLegerServer dLedgerServer; private final DLedgerServer dLedgerServer;
private final DLegerConfig dLedgerConfig; private final DLedgerConfig dLedgerConfig;
private final DLegerMmapFileStore dLedgerFileStore; private final DLedgerMmapFileStore dLedgerFileStore;
private final MmapFileList dLedgerFileList; private final MmapFileList dLedgerFileList;
//The id identifies the broker role, 0 means master, others means slave //The id identifies the broker role, 0 means master, others means slave
...@@ -73,7 +73,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -73,7 +73,7 @@ public class DLedgerCommitLog extends CommitLog {
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore); super(defaultMessageStore);
dLedgerConfig = new DLegerConfig(); dLedgerConfig = new DLedgerConfig();
dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
...@@ -82,10 +82,10 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -82,10 +82,10 @@ public class DLedgerCommitLog extends CommitLog {
dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
originalDledgerEnableForceClean = dLedgerConfig.isEnableDiskForceClean(); originalDledgerEnableForceClean = dLedgerConfig.isEnableDiskForceClean();
id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;
dLedgerServer = new DLegerServer(dLedgerConfig); dLedgerServer = new DLedgerServer(dLedgerConfig);
dLedgerFileStore = (DLegerMmapFileStore) dLedgerServer.getdLegerStore(); dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
DLegerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
assert bodyOffset == DLegerEntry.BODY_OFFSET; assert bodyOffset == DLedgerEntry.BODY_OFFSET;
buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
buffer.putLong(entry.getPos() + bodyOffset); buffer.putLong(entry.getPos() + bodyOffset);
}; };
...@@ -215,7 +215,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -215,7 +215,7 @@ public class DLedgerCommitLog extends CommitLog {
if (sbr == null) { if (sbr == null) {
return null; return null;
} else { } else {
return new DLegerSelectMappedBufferResult(sbr); return new DLedgerSelectMappedBufferResult(sbr);
} }
} }
...@@ -250,7 +250,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -250,7 +250,7 @@ public class DLedgerCommitLog extends CommitLog {
if (offset >= dLedgerFileStore.getCommittedPos()) { if (offset >= dLedgerFileStore.getCommittedPos()) {
return null; return null;
} }
int mappedFileSize = this.dLedgerServer.getdLegerConfig().getMappedFileSizeForEntryData(); int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) { if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize); int pos = (int) (offset % mappedFileSize);
...@@ -288,7 +288,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -288,7 +288,7 @@ public class DLedgerCommitLog extends CommitLog {
public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) { final boolean readBody) {
try { try {
int bodyOffset = DLegerEntry.BODY_OFFSET; int bodyOffset = DLedgerEntry.BODY_OFFSET;
int pos = byteBuffer.position(); int pos = byteBuffer.position();
int magic = byteBuffer.getInt(); int magic = byteBuffer.getInt();
if (magic == MmapFileList.BLANK_MAGIC_CODE) { if (magic == MmapFileList.BLANK_MAGIC_CODE) {
...@@ -383,7 +383,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -383,7 +383,7 @@ public class DLedgerCommitLog extends CommitLog {
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data); request.setBody(encodeResult.data);
dlegerFuture = dLedgerServer.handleAppend(request); dlegerFuture = dLedgerServer.handleAppend(request);
if (dlegerFuture.isDone() && dlegerFuture.get().getCode() != DLegerResponseCode.SUCCESS.getCode()) { if (dlegerFuture.isDone() && dlegerFuture.get().getCode() != DLedgerResponseCode.SUCCESS.getCode()) {
//TO DO make sure the local store is ok //TO DO make sure the local store is ok
appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} else { } else {
...@@ -417,10 +417,10 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -417,10 +417,10 @@ public class DLedgerCommitLog extends CommitLog {
if (dlegerFuture != null) { if (dlegerFuture != null) {
try { try {
AppendEntryResponse appendEntryResponse = dlegerFuture.get(3, TimeUnit.SECONDS); AppendEntryResponse appendEntryResponse = dlegerFuture.get(3, TimeUnit.SECONDS);
switch (DLegerResponseCode.valueOf(appendEntryResponse.getCode())) { switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
case SUCCESS: case SUCCESS:
putMessageStatus = PutMessageStatus.PUT_OK; putMessageStatus = PutMessageStatus.PUT_OK;
long wroteOffset = appendEntryResponse.getPos() + DLegerEntry.BODY_OFFSET; long wroteOffset = appendEntryResponse.getPos() + DLedgerEntry.BODY_OFFSET;
ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, 0); appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, 0);
...@@ -465,7 +465,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -465,7 +465,7 @@ public class DLedgerCommitLog extends CommitLog {
if (offset < dividedCommitlogOffset) { if (offset < dividedCommitlogOffset) {
return getMessage(offset, size); return getMessage(offset, size);
} }
int mappedFileSize = this.dLedgerServer.getdLegerConfig().getMappedFileSizeForEntryData(); int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) { if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize); int pos = (int) (offset % mappedFileSize);
...@@ -678,22 +678,24 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -678,22 +678,24 @@ public class DLedgerCommitLog extends CommitLog {
} }
static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult { public static class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult {
private SelectMmapBufferResult sbr; private SelectMmapBufferResult sbr;
public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) { public DLedgerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null); super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null);
this.sbr = sbr; this.sbr = sbr;
} }
public synchronized void release() { public synchronized void release() {
super.release(); super.release();
sbr.release(); if (sbr != null) {
sbr.release();
}
} }
} }
public DLegerServer getdLedgerServer() { public DLedgerServer getdLedgerServer() {
return dLedgerServer; return dLedgerServer;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dleger.store.file.SelectMmapBufferResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
public class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult {
private SelectMmapBufferResult sbr;
public DLedgerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null);
this.sbr = sbr;
}
@Override
public synchronized void release() {
super.release();
sbr.release();
}
}
...@@ -8,7 +8,7 @@ import java.util.UUID; ...@@ -8,7 +8,7 @@ import java.util.UUID;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import io.openmessaging.storage.dleger.DLegerServer; import io.openmessaging.storage.dledger.DLedgerServer;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.GetMessageStatus;
...@@ -43,8 +43,8 @@ public class DLedgerCommitlogTest extends StoreTestBase { ...@@ -43,8 +43,8 @@ public class DLedgerCommitlogTest extends StoreTestBase {
}, new BrokerConfig()); }, new BrokerConfig());
if (leaderId != null) { if (leaderId != null) {
DLegerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer(); DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer();
dLegerServer.getdLegerConfig().setEnableLeaderElector(false); dLegerServer.getdLedgerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) { if (selfId.equals(leaderId)) {
dLegerServer.getMemberState().changeToLeader(-1); dLegerServer.getMemberState().changeToLeader(-1);
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册