From d30d0e85caedf36528934936c2417226c1ca5305 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Mon, 3 Dec 2018 14:30:20 +0800 Subject: [PATCH] Polish DLedgerCommitlog, and rename dleger to dledger --- .../rocketmq/broker/BrokerController.java | 10 +- .../DLedgerRoleChangeHandler.java} | 18 +-- .../org/apache/rocketmq/store/CommitLog.java | 1 + .../rocketmq/store/DefaultMessageStore.java | 4 +- .../DLedgerCommitLog.java} | 105 +++++++----------- .../DLedgerSelectMappedBufferResult.java} | 6 +- .../DLedgerCommitlogTest.java} | 6 +- .../ProduceAndConsumeTest.java | 2 +- 8 files changed, 66 insertions(+), 86 deletions(-) rename broker/src/main/java/org/apache/rocketmq/broker/{dleger/DLegerRoleChangeHandler.java => dledger/DLedgerRoleChangeHandler.java} (88%) rename store/src/main/java/org/apache/rocketmq/store/{dleger/DLegerCommitLog.java => dledger/DLedgerCommitLog.java} (90%) rename store/src/main/java/org/apache/rocketmq/store/{dleger/DLegerSelectMappedBufferResult.java => dledger/DLedgerSelectMappedBufferResult.java} (85%) rename store/src/test/java/org/apache/rocketmq/store/{dleger/DLegerCommitlogTest.java => dledger/DLedgerCommitlogTest.java} (96%) rename test/src/test/java/org/apache/rocketmq/test/base/{dleger => dledger}/ProduceAndConsumeTest.java (99%) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 24453a59..72cbbc1c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -39,7 +39,7 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager; -import org.apache.rocketmq.broker.dleger.DLegerRoleChangeHandler; +import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler; import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; import org.apache.rocketmq.broker.filtersrv.FilterServerManager; @@ -99,7 +99,7 @@ import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.dleger.DLegerCommitLog; +import org.apache.rocketmq.store.dledger.DLedgerCommitLog; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -236,8 +236,8 @@ public class BrokerController { new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); if (messageStoreConfig.isEnableDLegerCommitLog()) { - DLegerRoleChangeHandler roleChangeHandler = new DLegerRoleChangeHandler(this, (DefaultMessageStore) messageStore); - ((DLegerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLegerServer().getdLegerLeaderElector().addRoleChangeHandler(roleChangeHandler); + DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); + ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLegerServer().getdLegerLeaderElector().addRoleChangeHandler(roleChangeHandler); } this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin @@ -784,7 +784,7 @@ public class BrokerController { } if (messageStoreConfig.isEnableDLegerCommitLog()) { - changeToSlave(((DLegerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getId()); + changeToSlave(((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getId()); } else { startProcessorByHa(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java similarity index 88% rename from broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java rename to broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java index 6c5fddcd..3a2e19cb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/dleger/DLegerRoleChangeHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.dleger; +package org.apache.rocketmq.broker.dledger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -29,21 +29,21 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.config.BrokerRole; -import org.apache.rocketmq.store.dleger.DLegerCommitLog; +import org.apache.rocketmq.store.dledger.DLedgerCommitLog; -public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler { +public class DLedgerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_")); private BrokerController brokerController; private DefaultMessageStore messageStore; - private DLegerCommitLog dLegerCommitLog; + private DLedgerCommitLog dLedgerCommitLog; private DLegerServer dLegerServer; - public DLegerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { + public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { this.brokerController = brokerController; this.messageStore = messageStore; - this.dLegerCommitLog = (DLegerCommitLog) messageStore.getCommitLog(); - this.dLegerServer = dLegerCommitLog.getdLegerServer(); + this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); + this.dLegerServer = dLedgerCommitLog.getdLegerServer(); } @Override public void handle(long term, MemberState.Role role) { @@ -55,11 +55,11 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa switch (role) { case CANDIDATE: if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { - brokerController.changeToSlave(dLegerCommitLog.getId()); + brokerController.changeToSlave(dLedgerCommitLog.getId()); } break; case FOLLOWER: - brokerController.changeToSlave(dLegerCommitLog.getId()); + brokerController.changeToSlave(dLedgerCommitLog.getId()); break; case LEADER: while (dLegerServer.getMemberState().isLeader() diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index d10e30ba..2528ef2d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -474,6 +474,7 @@ public class CommitLog { } // Commitlog case files are deleted else { + log.warn("The commitlog files are deleted, and delete the consume queue files"); this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 828088a8..91d51530 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -52,7 +52,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; -import org.apache.rocketmq.store.dleger.DLegerCommitLog; +import org.apache.rocketmq.store.dledger.DLedgerCommitLog; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.index.IndexService; import org.apache.rocketmq.store.index.QueryOffsetResult; @@ -119,7 +119,7 @@ public class DefaultMessageStore implements MessageStore { this.brokerStatsManager = brokerStatsManager; this.allocateMappedFileService = new AllocateMappedFileService(this); if (messageStoreConfig.isEnableDLegerCommitLog()) { - this.commitLog = new DLegerCommitLog(this); + this.commitLog = new DLedgerCommitLog(this); } else { this.commitLog = new CommitLog(this); } diff --git a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java similarity index 90% rename from store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java rename to store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 9255ba11..3dc443a9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.store.dleger; +package org.apache.rocketmq.store.dledger; import io.openmessaging.storage.dleger.DLegerConfig; import io.openmessaging.storage.dleger.DLegerServer; @@ -34,7 +34,6 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.store.AppendMessageResult; @@ -52,7 +51,7 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService; /** * Store all metadata downtime for recovery, data protection reliability */ -public class DLegerCommitLog extends CommitLog { +public class DLedgerCommitLog extends CommitLog { private final DLegerServer dLegerServer; private final DLegerConfig dLegerConfig; private final DLegerMmapFileStore dLegerFileStore; @@ -66,7 +65,7 @@ public class DLegerCommitLog extends CommitLog { private volatile long beginTimeInDlegerLock = 0; - public DLegerCommitLog(final DefaultMessageStore defaultMessageStore) { + public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { super(defaultMessageStore); dLegerConfig = new DLegerConfig(); dLegerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); @@ -87,6 +86,7 @@ public class DLegerCommitLog extends CommitLog { } + @Override public boolean load() { /*boolean result = this.mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed")); @@ -94,6 +94,7 @@ public class DLegerCommitLog extends CommitLog { return true; } + @Override public void start() { dLegerServer.startup(); /* this.flushCommitLogService.start(); @@ -103,6 +104,7 @@ public class DLegerCommitLog extends CommitLog { }*/ } + @Override public void shutdown() { /* if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { this.commitLogService.shutdown(); @@ -111,11 +113,13 @@ public class DLegerCommitLog extends CommitLog { dLegerServer.shutdown(); } + @Override public long flush() { dLegerFileStore.flush(); return dLegerFileList.getFlushedWhere(); } + @Override public long getMaxOffset() { if (this.dLegerFileStore.getCommittedPos() != -1) { return dLegerFileStore.getCommittedPos(); @@ -124,14 +128,17 @@ public class DLegerCommitLog extends CommitLog { } } + @Override public long remainHowManyDataToCommit() { return dLegerFileList.remainHowManyDataToCommit(); } + @Override public long remainHowManyDataToFlush() { return dLegerFileList.remainHowManyDataToFlush(); } + @Override public int deleteExpiredFile( final long expiredTime, final int deleteFilesInterval, @@ -142,7 +149,7 @@ public class DLegerCommitLog extends CommitLog { } - private static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult { + static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult { private SelectMmapBufferResult sbr; public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) { @@ -182,11 +189,13 @@ public class DLegerCommitLog extends CommitLog { /** * Read CommitLog data, use data replication */ + @Override public SelectMappedBufferResult getData(final long offset) { return this.getData(offset, offset == 0); } + @Override public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { if (offset >= dLegerFileStore.getCommittedPos()) { return null; @@ -205,28 +214,27 @@ public class DLegerCommitLog extends CommitLog { /** * When the normal exit, data recovery, all memory data have been flush */ - public void recoverNormally() { + @Override + public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { } - public void recoverAbnormally() { + + @Override + public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { } + @Override public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC) { return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true); } - private void doNothingForDeadCode(final Object obj) { - if (obj != null) { - log.debug(String.valueOf(obj.hashCode())); - } - } - /** * check the message and returns the message size * * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */ + @Override public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { try { @@ -251,22 +259,19 @@ public class DLegerCommitLog extends CommitLog { return new DispatchRequest(-1, false /* success */); } + @Override public long getConfirmOffset() { return this.dLegerFileStore.getCommittedPos() == -1 ? getMaxOffset() : this.dLegerFileStore.getCommittedPos(); } + @Override public void setConfirmOffset(long phyOffset) { log.warn("Should not set confirm offset {} for dleger commitlog", phyOffset); } - - private void notifyMessageArriving() { - - } - + @Override public boolean resetOffset(long offset) { - //return this.mappedFileQueue.resetOffset(offset); return false; } @@ -349,7 +354,7 @@ public class DLegerCommitLog extends CommitLog { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information - DLegerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); + DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); break; default: break; @@ -408,43 +413,20 @@ public class DLegerCommitLog extends CommitLog { return putMessageResult; } - public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { - - } - - public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { - - } - @Override public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } - /** - * According to receive certain message or offset storage time if an error occurs, it returns -1 - */ - public long pickupStoreTimestamp(final long offset, final int size) { - if (offset >= this.getMinOffset()) { - SelectMappedBufferResult result = this.getMessage(offset, size); - if (null != result) { - try { - return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); - } finally { - result.release(); - } - } - } - - return -1; - } + @Override public long getMinOffset() { return dLegerFileList.getMinOffset(); } + @Override public SelectMappedBufferResult getMessage(final long offset, final int size) { int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, offset == 0); @@ -455,42 +437,40 @@ public class DLegerCommitLog extends CommitLog { return null; } + @Override public long rollNextFile(final long offset) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); return offset + mappedFileSize - offset % mappedFileSize; } + @Override public HashMap getTopicQueueTable() { return topicQueueTable; } + @Override public void setTopicQueueTable(HashMap topicQueueTable) { this.topicQueueTable = topicQueueTable; } + @Override public void destroy() { //TO DO } + @Override public boolean appendData(long startOffset, byte[] data) { //TO DO return false; } + @Override public boolean retryDeleteFirstFile(final long intervalForcibly) { - //TO DO return false; } - public void removeQueueFromTopicQueueTable(final String topic, final int queueId) { - String key = topic + "-" + queueId; - synchronized (this) { - this.topicQueueTable.remove(key); - } - - log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId); - } + @Override public void checkSelf() { dLegerFileList.checkSelf(); } @@ -560,10 +540,10 @@ public class DLegerCommitLog extends CommitLog { keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); - Long queueOffset = DLegerCommitLog.this.topicQueueTable.get(key); + Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; - DLegerCommitLog.this.topicQueueTable.put(key, queueOffset); + DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling @@ -603,7 +583,7 @@ public class DLegerCommitLog extends CommitLog { // Exceeds the maximum message if (msgLen > this.maxMessageSize) { - DLegerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key); } @@ -612,7 +592,7 @@ public class DLegerCommitLog extends CommitLog { // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE - this.msgStoreItemMemory.putInt(DLegerCommitLog.MESSAGE_MAGIC_CODE); + this.msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID @@ -642,18 +622,17 @@ public class DLegerCommitLog extends CommitLog { this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); - if (bodyLength > 0) + if (bodyLength > 0) { this.msgStoreItemMemory.put(msgInner.getBody()); + } // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); - if (propertiesLength > 0) + if (propertiesLength > 0) { this.msgStoreItemMemory.put(propertiesData); - - final long beginTimeMills = DLegerCommitLog.this.defaultMessageStore.now(); - + } byte[] data = new byte[msgLen]; this.msgStoreItemMemory.clear(); this.msgStoreItemMemory.get(data); diff --git a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerSelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerSelectMappedBufferResult.java similarity index 85% rename from store/src/main/java/org/apache/rocketmq/store/dleger/DLegerSelectMappedBufferResult.java rename to store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerSelectMappedBufferResult.java index 3f6b235c..7f5e0cd1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dleger/DLegerSelectMappedBufferResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerSelectMappedBufferResult.java @@ -15,15 +15,15 @@ * limitations under the License. */ -package org.apache.rocketmq.store.dleger; +package org.apache.rocketmq.store.dledger; import io.openmessaging.storage.dleger.store.file.SelectMmapBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult; -public class DLegerSelectMappedBufferResult extends SelectMappedBufferResult { +public class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult { private SelectMmapBufferResult sbr; - public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) { + public DLedgerSelectMappedBufferResult(SelectMmapBufferResult sbr) { super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null); this.sbr = sbr; } diff --git a/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java similarity index 96% rename from store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java rename to store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index ed3397a4..36acc534 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dleger/DLegerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.store.dleger; +package org.apache.rocketmq.store.dledger; import java.io.File; import java.nio.ByteBuffer; @@ -22,7 +22,7 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Assert; import org.junit.Test; -public class DLegerCommitlogTest extends StoreTestBase { +public class DLedgerCommitlogTest extends StoreTestBase { private DefaultMessageStore createMessageStore(String base, String group, String selfId, String peers, String leaderId) throws Exception { baseDirs.add(base); @@ -43,7 +43,7 @@ public class DLegerCommitlogTest extends StoreTestBase { }, new BrokerConfig()); if (leaderId != null) { - DLegerServer dLegerServer = ((DLegerCommitLog) defaultMessageStore.getCommitLog()).getdLegerServer(); + DLegerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLegerServer(); dLegerServer.getdLegerConfig().setEnableLeaderElector(false); if (selfId.equals(leaderId)) { dLegerServer.getMemberState().changeToLeader(-1); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dleger/ProduceAndConsumeTest.java b/test/src/test/java/org/apache/rocketmq/test/base/dledger/ProduceAndConsumeTest.java similarity index 99% rename from test/src/test/java/org/apache/rocketmq/test/base/dleger/ProduceAndConsumeTest.java rename to test/src/test/java/org/apache/rocketmq/test/base/dledger/ProduceAndConsumeTest.java index 30ddff4f..813447f2 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/dleger/ProduceAndConsumeTest.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/dledger/ProduceAndConsumeTest.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.test.base.dleger; +package org.apache.rocketmq.test.base.dledger; import java.util.UUID; import org.apache.rocketmq.broker.BrokerController; -- GitLab