From 58496c2f3996a264bc223d617b98f2bd53f5c7bf Mon Sep 17 00:00:00 2001 From: dongeforever Date: Tue, 4 Dec 2018 13:57:55 +0800 Subject: [PATCH] Rename dleger to dledger --- .../dledger/DLedgerRoleChangeHandler.java | 16 ++--- store/pom.xml | 2 +- .../store/dledger/DLedgerCommitLog.java | 60 ++++++++++--------- .../DLedgerSelectMappedBufferResult.java | 36 ----------- .../store/dledger/DLedgerCommitlogTest.java | 6 +- 5 files changed, 43 insertions(+), 77 deletions(-) delete mode 100644 store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerSelectMappedBufferResult.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java index f9391f32..b1f3bdcb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java @@ -21,24 +21,24 @@ import java.util.concurrent.Executors; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; -import io.openmessaging.storage.dleger.DLegerLeaderElector; -import io.openmessaging.storage.dleger.DLegerServer; -import io.openmessaging.storage.dleger.MemberState; -import io.openmessaging.storage.dleger.utils.UtilAll; +import io.openmessaging.storage.dledger.DLedgerLeaderElector; +import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.MemberState; +import io.openmessaging.storage.dledger.utils.UtilAll; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.dledger.DLedgerCommitLog; -public class DLedgerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler { +public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_")); private BrokerController brokerController; private DefaultMessageStore messageStore; private DLedgerCommitLog dLedgerCommitLog; - private DLegerServer dLegerServer; + private DLedgerServer dLegerServer; public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { this.brokerController = brokerController; this.messageStore = messageStore; @@ -63,11 +63,11 @@ public class DLedgerRoleChangeHandler implements DLegerLeaderElector.RoleChangeH break; case LEADER: while (dLegerServer.getMemberState().isLeader() - && (dLegerServer.getdLegerStore().getLegerEndIndex() != dLegerServer.getdLegerStore().getCommittedIndex() || messageStore.dispatchBehindBytes() != 0)) { + && (dLegerServer.getdLedgerStore().getLedgerEndIndex() != dLegerServer.getdLedgerStore().getCommittedIndex() || messageStore.dispatchBehindBytes() != 0)) { UtilAll.sleep(100); } boolean succ = dLegerServer.getMemberState().isLeader() - && dLegerServer.getdLegerStore().getLegerEndIndex() == dLegerServer.getdLegerStore().getCommittedIndex() + && dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex() && messageStore.dispatchBehindBytes() == 0; if (succ) { messageStore.recoverTopicQueueTable(); diff --git a/store/pom.xml b/store/pom.xml index 1c54d4dc..0d7681d3 100644 --- a/store/pom.xml +++ b/store/pom.xml @@ -30,7 +30,7 @@ io.openmessaging.storage - dleger + dledger 0.1-SNAPSHOT diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 5945b851..028d5379 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -16,16 +16,16 @@ */ package org.apache.rocketmq.store.dledger; -import io.openmessaging.storage.dleger.DLegerConfig; -import io.openmessaging.storage.dleger.DLegerServer; -import io.openmessaging.storage.dleger.entry.DLegerEntry; -import io.openmessaging.storage.dleger.protocol.AppendEntryRequest; -import io.openmessaging.storage.dleger.protocol.AppendEntryResponse; -import io.openmessaging.storage.dleger.protocol.DLegerResponseCode; -import io.openmessaging.storage.dleger.store.file.DLegerMmapFileStore; -import io.openmessaging.storage.dleger.store.file.MmapFile; -import io.openmessaging.storage.dleger.store.file.MmapFileList; -import io.openmessaging.storage.dleger.store.file.SelectMmapBufferResult; +import io.openmessaging.storage.dledger.DLedgerConfig; +import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.entry.DLedgerEntry; +import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; +import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; +import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; +import io.openmessaging.storage.dledger.store.file.MmapFile; +import io.openmessaging.storage.dledger.store.file.MmapFileList; +import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.concurrent.CompletableFuture; @@ -53,9 +53,9 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService; * Store all metadata downtime for recovery, data protection reliability */ public class DLedgerCommitLog extends CommitLog { - private final DLegerServer dLedgerServer; - private final DLegerConfig dLedgerConfig; - private final DLegerMmapFileStore dLedgerFileStore; + private final DLedgerServer dLedgerServer; + private final DLedgerConfig dLedgerConfig; + private final DLedgerMmapFileStore dLedgerFileStore; private final MmapFileList dLedgerFileList; //The id identifies the broker role, 0 means master, others means slave @@ -73,7 +73,7 @@ public class DLedgerCommitLog extends CommitLog { public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { super(defaultMessageStore); - dLedgerConfig = new DLegerConfig(); + dLedgerConfig = new DLedgerConfig(); dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); @@ -82,10 +82,10 @@ public class DLedgerCommitLog extends CommitLog { dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); originalDledgerEnableForceClean = dLedgerConfig.isEnableDiskForceClean(); id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; - dLedgerServer = new DLegerServer(dLedgerConfig); - dLedgerFileStore = (DLegerMmapFileStore) dLedgerServer.getdLegerStore(); - DLegerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { - assert bodyOffset == DLegerEntry.BODY_OFFSET; + dLedgerServer = new DLedgerServer(dLedgerConfig); + dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); + DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { + assert bodyOffset == DLedgerEntry.BODY_OFFSET; buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); buffer.putLong(entry.getPos() + bodyOffset); }; @@ -215,7 +215,7 @@ public class DLedgerCommitLog extends CommitLog { if (sbr == null) { return null; } else { - return new DLegerSelectMappedBufferResult(sbr); + return new DLedgerSelectMappedBufferResult(sbr); } } @@ -250,7 +250,7 @@ public class DLedgerCommitLog extends CommitLog { if (offset >= dLedgerFileStore.getCommittedPos()) { return null; } - int mappedFileSize = this.dLedgerServer.getdLegerConfig().getMappedFileSizeForEntryData(); + int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); @@ -288,7 +288,7 @@ public class DLedgerCommitLog extends CommitLog { public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { try { - int bodyOffset = DLegerEntry.BODY_OFFSET; + int bodyOffset = DLedgerEntry.BODY_OFFSET; int pos = byteBuffer.position(); int magic = byteBuffer.getInt(); if (magic == MmapFileList.BLANK_MAGIC_CODE) { @@ -383,7 +383,7 @@ public class DLedgerCommitLog extends CommitLog { request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setBody(encodeResult.data); dlegerFuture = dLedgerServer.handleAppend(request); - if (dlegerFuture.isDone() && dlegerFuture.get().getCode() != DLegerResponseCode.SUCCESS.getCode()) { + if (dlegerFuture.isDone() && dlegerFuture.get().getCode() != DLedgerResponseCode.SUCCESS.getCode()) { //TO DO make sure the local store is ok appendResult = new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } else { @@ -417,10 +417,10 @@ public class DLedgerCommitLog extends CommitLog { if (dlegerFuture != null) { try { AppendEntryResponse appendEntryResponse = dlegerFuture.get(3, TimeUnit.SECONDS); - switch (DLegerResponseCode.valueOf(appendEntryResponse.getCode())) { + switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { case SUCCESS: putMessageStatus = PutMessageStatus.PUT_OK; - long wroteOffset = appendEntryResponse.getPos() + DLegerEntry.BODY_OFFSET; + long wroteOffset = appendEntryResponse.getPos() + DLedgerEntry.BODY_OFFSET; ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, 0); @@ -465,7 +465,7 @@ public class DLedgerCommitLog extends CommitLog { if (offset < dividedCommitlogOffset) { return getMessage(offset, size); } - int mappedFileSize = this.dLedgerServer.getdLegerConfig().getMappedFileSizeForEntryData(); + int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); @@ -678,22 +678,24 @@ public class DLedgerCommitLog extends CommitLog { } - static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult { + public static class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult { private SelectMmapBufferResult sbr; - public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) { + public DLedgerSelectMappedBufferResult(SelectMmapBufferResult sbr) { super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null); this.sbr = sbr; } public synchronized void release() { super.release(); - sbr.release(); + if (sbr != null) { + sbr.release(); + } } } - public DLegerServer getdLedgerServer() { + public DLedgerServer getdLedgerServer() { return dLedgerServer; } diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerSelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerSelectMappedBufferResult.java deleted file mode 100644 index 7f5e0cd1..00000000 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerSelectMappedBufferResult.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.store.dledger; - -import io.openmessaging.storage.dleger.store.file.SelectMmapBufferResult; -import org.apache.rocketmq.store.SelectMappedBufferResult; - -public class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult { - - private SelectMmapBufferResult sbr; - public DLedgerSelectMappedBufferResult(SelectMmapBufferResult sbr) { - super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null); - this.sbr = sbr; - } - - @Override - public synchronized void release() { - super.release(); - sbr.release(); - } -} diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 7d5d07b3..57742856 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -8,7 +8,7 @@ import java.util.UUID; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; -import io.openmessaging.storage.dleger.DLegerServer; +import io.openmessaging.storage.dledger.DLedgerServer; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; @@ -43,8 +43,8 @@ public class DLedgerCommitlogTest extends StoreTestBase { }, new BrokerConfig()); if (leaderId != null) { - DLegerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer(); - dLegerServer.getdLegerConfig().setEnableLeaderElector(false); + DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer(); + dLegerServer.getdLedgerConfig().setEnableLeaderElector(false); if (selfId.equals(leaderId)) { dLegerServer.getMemberState().changeToLeader(-1); } else { -- GitLab