From f5b79588d58a1cc2842d364e06a469c0b67cdb39 Mon Sep 17 00:00:00 2001 From: rongtong Date: Fri, 31 Jul 2020 17:22:22 +0800 Subject: [PATCH] [ISSUE #2217] Upgrade dledger version (#2218) * feature(dledger):upgrade dledger version and support preferred leader/batch push * feature(dledger):pass the failed UT and pass the CI --- store/pom.xml | 2 +- .../store/config/MessageStoreConfig.java | 20 +++++++++++++++++++ .../store/dledger/DLedgerCommitLog.java | 3 +++ .../store/dledger/MessageStoreTestBase.java | 4 ++-- 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/store/pom.xml b/store/pom.xml index d7d0b9f0..8f4b44a2 100644 --- a/store/pom.xml +++ b/store/pom.xml @@ -31,7 +31,7 @@ io.openmessaging.storage dledger - 0.1 + 0.2.0 org.apache.rocketmq diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 7891f710..c13ad4cf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -148,6 +148,10 @@ public class MessageStoreConfig { private String dLegerPeers; private String dLegerSelfId; + private String preferredLeaderId; + + private boolean isEnableBatchPush = false; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -702,4 +706,20 @@ public class MessageStoreConfig { public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) { this.enableDLegerCommitLog = enableDLegerCommitLog; } + + public String getPreferredLeaderId() { + return preferredLeaderId; + } + + public void setPreferredLeaderId(String preferredLeaderId) { + this.preferredLeaderId = preferredLeaderId; + } + + public boolean isEnableBatchPush() { + return isEnableBatchPush; + } + + public void setEnableBatchPush(boolean enableBatchPush) { + isEnableBatchPush = enableBatchPush; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 3361b63f..400ad786 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -84,6 +84,9 @@ public class DLedgerCommitLog extends CommitLog { dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); + dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId()); + dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush()); + id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; dLedgerServer = new DLedgerServer(dLedgerConfig); dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index 5b0ca347..5864b282 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -63,9 +63,9 @@ public class MessageStoreTestBase extends StoreTestBase { if (leaderId != null) { dLegerServer.getdLedgerConfig().setEnableLeaderElector(false); if (selfId.equals(leaderId)) { - dLegerServer.getMemberState().changeToLeader(-1); + dLegerServer.getMemberState().changeToLeader(0); } else { - dLegerServer.getMemberState().changeToFollower(-1, leaderId); + dLegerServer.getMemberState().changeToFollower(0, leaderId); } } -- GitLab