diff --git a/store/pom.xml b/store/pom.xml index d7d0b9f0c9e2aadd4a891a6706583edec8e08a27..8f4b44a28813f342e16122b5fb33b5a6952ed723 100644 --- a/store/pom.xml +++ b/store/pom.xml @@ -31,7 +31,7 @@ io.openmessaging.storage dledger - 0.1 + 0.2.0 org.apache.rocketmq diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 7891f71067a26e4118efde7c79a0b7de7279b8be..c13ad4cfa22be8a5829075523b2d4e96510ca558 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -148,6 +148,10 @@ public class MessageStoreConfig { private String dLegerPeers; private String dLegerSelfId; + private String preferredLeaderId; + + private boolean isEnableBatchPush = false; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -702,4 +706,20 @@ public class MessageStoreConfig { public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) { this.enableDLegerCommitLog = enableDLegerCommitLog; } + + public String getPreferredLeaderId() { + return preferredLeaderId; + } + + public void setPreferredLeaderId(String preferredLeaderId) { + this.preferredLeaderId = preferredLeaderId; + } + + public boolean isEnableBatchPush() { + return isEnableBatchPush; + } + + public void setEnableBatchPush(boolean enableBatchPush) { + isEnableBatchPush = enableBatchPush; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 3361b63f826670193854752cda088742d049ef51..400ad786dbecc110fdea1b09c9c12705f07e8ba4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -84,6 +84,9 @@ public class DLedgerCommitLog extends CommitLog { dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); + dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId()); + dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush()); + id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; dLedgerServer = new DLedgerServer(dLedgerConfig); dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index 5b0ca347579dc7e391dab25645d5a87620a89269..5864b282c7bbff80bc4ca7a2e4db54541ace3bbd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -63,9 +63,9 @@ public class MessageStoreTestBase extends StoreTestBase { if (leaderId != null) { dLegerServer.getdLedgerConfig().setEnableLeaderElector(false); if (selfId.equals(leaderId)) { - dLegerServer.getMemberState().changeToLeader(-1); + dLegerServer.getMemberState().changeToLeader(0); } else { - dLegerServer.getMemberState().changeToFollower(-1, leaderId); + dLegerServer.getMemberState().changeToFollower(0, leaderId); } }