diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index a018f68f627f7a8c648a3b53d747bf17c44d25c7..d176b86f7c2c00fbbd2ce44cd94e7d26121a13fa 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -28,10 +28,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; -/** - * BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and - * {@link BrokerController#pullThreadPoolQueue} - */ public class BrokerFastFailure { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index e544d90a124a0d4044b4087ab2c7cfaa05edfd22..34dc640aeab0c87c5776b0fa2a36e16b454243c6 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -122,8 +122,8 @@ public class MessageStoreWithFilterTest { public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, boolean enableCqExt, int cqExtFileSize) { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); - messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); + messageStoreConfig.setMappedFileSizeCommitLog(commitLogFileSize); + messageStoreConfig.setMappedFileSizeConsumeQueue(cqFileSize); messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize); messageStoreConfig.setMessageIndexEnable(false); messageStoreConfig.setEnableConsumeQueueExt(enableCqExt); diff --git a/docs/cn/best_practice.md b/docs/cn/best_practice.md index 3b64ed66ec99c4c4a60645c2a199be21bf8ada4c..39febbcddba2e2a722e30761591a3bc03f58f8a4 100755 --- a/docs/cn/best_practice.md +++ b/docs/cn/best_practice.md @@ -188,7 +188,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相 | brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave | | storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 | | storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 | -| mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |​ +| mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |​ | deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |​ | fileReservedTime | 72 | 以小时计算的文件保留时间 |​ | brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |​ diff --git a/docs/en/Operations_Broker.md b/docs/en/Operations_Broker.md index 2ec9f03763e46448e6dfaeab39ae3d89cd19eb4f..cf3ea581913b02b8772dcd9ec5270fdc00155700 100644 --- a/docs/en/Operations_Broker.md +++ b/docs/en/Operations_Broker.md @@ -16,7 +16,7 @@ ASYNC_FLUSH is recommended, for SYNC_FLUSH is expensive and will cause too much | brokerId | 0 | broker id, 0 means master, positive integers mean slave | | storePathCommitLog | $HOME/store/commitlog/ | file path for commit log | | storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue | -| mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |​ +| mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |​ | deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |​ | fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |​ | brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |​ diff --git a/docs/en/best_practice.md b/docs/en/best_practice.md index 0ee7387ee4b4214245122124e9b70939df36a51a..be3b937d1037b859e9c117e8f1a0152a33adcb6c 100755 --- a/docs/en/best_practice.md +++ b/docs/en/best_practice.md @@ -22,7 +22,7 @@ | brokerId | 0 | broker id, 0 means master, positive integers mean slave | | storePathCommitLog | $HOME/store/commitlog/ | file path for commit log | | storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue | -| mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |​ +| mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |​ | deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |​ | fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |​ | brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |​ diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index f28af3870c76e13b29c876996ae4cc413693b268..532367081a4532f16693a0deb10ce48bdf5a4a84 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -185,7 +185,7 @@ public class AllocateMappedFileService extends ServiceThread { // pre write mappedFile if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig() - .getMapedFileSizeCommitLog() + .getMappedFileSizeCommitLog() && this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(), diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index cbcc1a7b728d1be62a43db41973a77090724abc2..b0bf8bbefda70449a660958f5ff453ac07c74c50 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -65,7 +65,7 @@ public class CommitLog { public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), - defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); + defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -144,7 +144,7 @@ public class CommitLog { } public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { - int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); @@ -828,7 +828,7 @@ public class CommitLog { } public SelectMappedBufferResult getMessage(final long offset, final int size) { - int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); @@ -838,7 +838,7 @@ public class CommitLog { } public long rollNextFile(final long offset) { - int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); return offset + mappedFileSize - offset % mappedFileSize; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 4b649789b281d09321e54a96299ff47226eedfc0..f539411f027719ba978f2f95d4243eaad721781c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1144,7 +1144,7 @@ public class DefaultMessageStore implements MessageStore { topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), - this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), + this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { @@ -1309,7 +1309,7 @@ public class DefaultMessageStore implements MessageStore { topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), - this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), + this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this); this.putConsumeQueue(topic, queueId, logic); if (!logic.load()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index 93194656f4108d26c4fe995db5957f101d562a1b..e2c1d16b5c524533260b458123e045e9d3963784 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -49,7 +49,6 @@ public class MappedFile extends ReferenceResource { private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); protected final AtomicInteger wrotePosition = new AtomicInteger(0); - //ADD BY ChenYang protected final AtomicInteger committedPosition = new AtomicInteger(0); private final AtomicInteger flushedPosition = new AtomicInteger(0); protected int fileSize; @@ -119,7 +118,6 @@ public class MappedFile extends ReferenceResource { private static ByteBuffer viewed(ByteBuffer buffer) { String methodName = "viewedBuffer"; - Method[] methods = buffer.getClass().getMethods(); for (int i = 0; i < methods.length; i++) { if (methods[i].getName().equals("attachment")) { @@ -166,10 +164,10 @@ public class MappedFile extends ReferenceResource { TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { - log.error("create file channel " + this.fileName + " Failed. ", e); + log.error("Failed to create file " + this.fileName, e); throw e; } catch (IOException e) { - log.error("map file " + this.fileName + " Failed. ", e); + log.error("Failed to map file " + this.fileName, e); throw e; } finally { if (!ok && this.fileChannel != null) { @@ -207,7 +205,7 @@ public class MappedFile extends ReferenceResource { if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); - AppendMessageResult result = null; + AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { @@ -382,7 +380,6 @@ public class MappedFile extends ReferenceResource { public SelectMappedBufferResult selectMappedBuffer(int pos, int size) { int readPosition = getReadPosition(); if ((pos + size) <= readPosition) { - if (this.hold()) { ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos); diff --git a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java index 1f826fedd891b6ff9c55ec56f2b48b2bbdbbe558..034b964bac2a8f7508171df783f8007659af5cab 100644 --- a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java @@ -48,10 +48,6 @@ public class SelectMappedBufferResult { this.byteBuffer.limit(this.size); } - /* public MappedFile getMappedFile() { - return mappedFile; - }*/ - // @Override // protected void finalize() { // if (this.mappedFile != null) { diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java index 0de05911d993bc3b9f0e59f196374616271514d7..f692a99b1cc2296f6d67d009133db3a2f39871d3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java +++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java @@ -39,7 +39,7 @@ public class TransientStorePool { public TransientStorePool(final MessageStoreConfig storeConfig) { this.storeConfig = storeConfig; this.poolSize = storeConfig.getTransientStorePoolSize(); - this.fileSize = storeConfig.getMapedFileSizeCommitLog(); + this.fileSize = storeConfig.getMappedFileSizeCommitLog(); this.availableBuffers = new ConcurrentLinkedDeque<>(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index cb17345c3a1b6e4b9efccc15022d28f4d98553e3..2543e96d05790a89ac9bdd87461239c66aeb9809 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -31,9 +31,9 @@ public class MessageStoreConfig { + File.separator + "commitlog"; // CommitLog file size,default is 1G - private int mapedFileSizeCommitLog = 1024 * 1024 * 1024; + private int mappedFileSizeCommitLog = 1024 * 1024 * 1024; // ConsumeQueue file size,default is 30W - private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; + private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; // enable consume queue ext private boolean enableConsumeQueueExt = false; // ConsumeQueue extend file size, 48M @@ -188,22 +188,22 @@ public class MessageStoreConfig { this.warmMapedFileEnable = warmMapedFileEnable; } - public int getMapedFileSizeCommitLog() { - return mapedFileSizeCommitLog; + public int getMappedFileSizeCommitLog() { + return mappedFileSizeCommitLog; } - public void setMapedFileSizeCommitLog(int mapedFileSizeCommitLog) { - this.mapedFileSizeCommitLog = mapedFileSizeCommitLog; + public void setMappedFileSizeCommitLog(int mappedFileSizeCommitLog) { + this.mappedFileSizeCommitLog = mappedFileSizeCommitLog; } - public int getMapedFileSizeConsumeQueue() { + public int getMappedFileSizeConsumeQueue() { - int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0)); + int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0)); return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE); } - public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) { - this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue; + public void setMappedFileSizeConsumeQueue(int mappedFileSizeConsumeQueue) { + this.mappedFileSizeConsumeQueue = mappedFileSizeConsumeQueue; } public boolean isEnableConsumeQueueExt() { diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index a48c1bed8bacbd84cc6ca0442087190d63b8a6a4..8f795d972c5bef965b3660ea65b79c3f7030a032 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -80,7 +80,7 @@ public class DLedgerCommitLog extends CommitLog { dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); - dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); + dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; @@ -514,7 +514,7 @@ public class DLedgerCommitLog extends CommitLog { @Override public long rollNextFile(final long offset) { - int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); return offset + mappedFileSize - offset % mappedFileSize; } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java index ce1b50b67a863a1c297136e7e303001e8fefddba..11af1e2feb05fe4959acb83337e4765b9af0dd00 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java @@ -227,7 +227,7 @@ public class HAConnection { masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() - .getMapedFileSizeCommitLog()); + .getMappedFileSizeCommitLog()); if (masterOffset < 0) { masterOffset = 0; diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index 7f88d36e9a7283cc494b5d83055b6cc954671c0d..fbb2a746c4f96686b273adb4e0847f927a6cc1ff 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -47,8 +47,8 @@ public class AppendCallbackTest { @Before public void init() throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); messageStoreConfig.setMaxHashSlotNum(100); messageStoreConfig.setMaxIndexNum(100 * 10); messageStoreConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore"); diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java index 38178490be8b484d6d5d42313cb224b51fdc33b0..0c8e5bb1472caf268cea6522de7ebf332a5db42c 100644 --- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java @@ -66,8 +66,8 @@ public class BatchPutMessageTest { private MessageStore buildMessageStore() throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); messageStoreConfig.setMaxHashSlotNum(100); messageStoreConfig.setMaxIndexNum(100 * 10); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index 470d70b9534d4ee216c5ba0b7ea03844f333b072..c9730306b5d6793811191697062b9efc3f8894a6 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -86,8 +86,8 @@ public class ConsumeQueueTest { public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, boolean enableCqExt, int cqExtFileSize) { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); - messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); + messageStoreConfig.setMappedFileSizeCommitLog(commitLogFileSize); + messageStoreConfig.setMappedFileSizeConsumeQueue(cqFileSize); messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize); messageStoreConfig.setMessageIndexEnable(false); messageStoreConfig.setEnableConsumeQueueExt(enableCqExt); diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index b987cd385d9d6b6fe1eba33c6148aa4cab189874..fd510f11a55ae86a54bb919f43693240c0cba2c4 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -281,7 +281,7 @@ public class DefaultMessageStoreCleanFilesTest { } private int getMsgCountPerConsumeQueueMappedFile() { - int size = messageStore.getMessageStoreConfig().getMapedFileSizeConsumeQueue(); + int size = messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueue(); return size / CQ_STORE_UNIT_SIZE;// 7 in this case } @@ -322,8 +322,8 @@ public class DefaultMessageStoreCleanFilesTest { private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, double diskSpaceCleanForciblyRatio) throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest(); - messageStoreConfig.setMapedFileSizeCommitLog(mappedFileSize); - messageStoreConfig.setMapedFileSizeConsumeQueue(mappedFileSize); + messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize); + messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize); messageStoreConfig.setMaxHashSlotNum(10000); messageStoreConfig.setMaxIndexNum(100 * 100); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java index 78457dbfe8a7e78052c2c2a65ac719edd9ad56b0..e43bb4bc3d3f84ab382fee0adb492082c37a8222 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java @@ -64,8 +64,8 @@ public class DefaultMessageStoreShutDownTest { public DefaultMessageStore buildMessageStore() throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10); messageStoreConfig.setMaxHashSlotNum(10000); messageStoreConfig.setMaxIndexNum(100 * 100); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index d0f729340bdafc2bd82d040e82ca51234b457b7d..785977468e05138198b8da07ee28d10c17c404ef 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -40,7 +40,6 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; @@ -76,8 +75,8 @@ public class DefaultMessageStoreTest { MessageBody = StoreMessage.getBytes(); MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); messageStoreConfig.setMaxHashSlotNum(100); messageStoreConfig.setMaxIndexNum(100 * 10); MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig()); @@ -106,8 +105,8 @@ public class DefaultMessageStoreTest { private MessageStore buildMessageStore() throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10); messageStoreConfig.setMaxHashSlotNum(10000); messageStoreConfig.setMaxIndexNum(100 * 100); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index 83efcc1915e7a1845a652d9e872108930eacd74c..64495c33987b32d42e1a3c3adf813a6e958068e9 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -22,7 +22,6 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.After; import org.junit.Before; @@ -35,10 +34,7 @@ import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.HashMap; -import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; @@ -141,8 +137,8 @@ public class HATest { } private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig){ - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10); messageStoreConfig.setMaxHashSlotNum(10000); messageStoreConfig.setMaxIndexNum(100 * 100); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index e45178aaee9c50eba71ec355e48e6e6b589ef903..2da2fb7a2d5259a3fbbf5f4547607bee90e866f9 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -43,8 +43,8 @@ public class MessageStoreTestBase extends StoreTestBase { System.setProperty("dledger.disk.ratio.clean", "0.95"); baseDirs.add(base); MessageStoreConfig storeConfig = new MessageStoreConfig(); - storeConfig.setMapedFileSizeCommitLog(1024 * 100); - storeConfig.setMapedFileSizeConsumeQueue(1024); + storeConfig.setMappedFileSizeCommitLog(1024 * 100); + storeConfig.setMappedFileSizeConsumeQueue(1024); storeConfig.setMaxHashSlotNum(100); storeConfig.setMaxIndexNum(100 * 10); storeConfig.setStorePathRootDir(base); @@ -98,8 +98,8 @@ public class MessageStoreTestBase extends StoreTestBase { protected DefaultMessageStore createMessageStore(String base, boolean createAbort) throws Exception { baseDirs.add(base); MessageStoreConfig storeConfig = new MessageStoreConfig(); - storeConfig.setMapedFileSizeCommitLog(1024 * 100); - storeConfig.setMapedFileSizeConsumeQueue(1024); + storeConfig.setMappedFileSizeCommitLog(1024 * 100); + storeConfig.setMappedFileSizeConsumeQueue(1024); storeConfig.setMaxHashSlotNum(100); storeConfig.setMaxIndexNum(100 * 10); storeConfig.setStorePathRootDir(base); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java index a5242a35105649954be83ab668880018096b37b7..4aaa02908a8f0de03e03fb7304fb6df9b752d188 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java @@ -36,7 +36,7 @@ public class MixCommitlogTest extends MessageStoreTestBase { { DefaultMessageStore originalStore = createMessageStore(base, false); doPutMessages(originalStore, topic, 0, 1000, 0); - Assert.assertEquals(11, originalStore.getMaxPhyOffset()/originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); + Assert.assertEquals(11, originalStore.getMaxPhyOffset()/originalStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); Thread.sleep(500); Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0)); Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0)); @@ -83,7 +83,7 @@ public class MixCommitlogTest extends MessageStoreTestBase { Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, originalStore.dispatchBehindBytes()); dividedOffset = originalStore.getCommitLog().getMaxOffset(); - dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMappedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); doGetMessages(originalStore, topic, 0, 1000, 0); originalStore.shutdown(); } @@ -144,7 +144,7 @@ public class MixCommitlogTest extends MessageStoreTestBase { Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0)); Assert.assertEquals(0, originalStore.dispatchBehindBytes()); dividedOffset = originalStore.getCommitLog().getMaxOffset(); - dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMappedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); originalStore.shutdown(); } long maxPhysicalOffset; diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java index befbefd28d6476ee88b1d82da9ad30cd40bf00ef..fd860e6b9d7ea5d8a3d2c6e3a4fee714cda35d57 100644 --- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java @@ -91,8 +91,8 @@ public class ScheduleMessageServiceTest { public void init() throws Exception { messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMessageDelayLevel(testMessageDelayLevel); - messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); - messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); + messageStoreConfig.setMappedFileSizeCommitLog(commitLogFileSize); + messageStoreConfig.setMappedFileSizeConsumeQueue(cqFileSize); messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize); messageStoreConfig.setMessageIndexEnable(false); messageStoreConfig.setEnableConsumeQueueExt(true); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 0eacd584fadd0843129911ac33e1891f2b64d1e1..c484e87c11b5380fdfb20d4e3cd7275440c26253 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -135,7 +135,7 @@ public class IntegrationTestBase { brokerConfig.setEnablePropertyFilter(true); storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); - storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE); + storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE); storeConfig.setMaxIndexNum(INDEX_NUM); storeConfig.setMaxHashSlotNum(INDEX_NUM * 4); return createAndStartBroker(storeConfig, brokerConfig); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java index e2d9db1983e7a4254ffaa6835e90e66905ac59b8..2961f6b0b341c94ccdd43ac36eb7d22b29389735 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java @@ -35,7 +35,6 @@ import org.apache.rocketmq.test.base.IntegrationTestBase; import org.apache.rocketmq.test.factory.ConsumerFactory; import org.apache.rocketmq.test.factory.ProducerFactory; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import static org.apache.rocketmq.test.base.IntegrationTestBase.nextPort; @@ -58,7 +57,7 @@ public class DLedgerProduceAndConsumeIT { storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); storeConfig.setHaListenPort(nextPort()); - storeConfig.setMapedFileSizeCommitLog(10 * 1024 * 1024); + storeConfig.setMappedFileSizeCommitLog(10 * 1024 * 1024); storeConfig.setEnableDLegerCommitLog(true); storeConfig.setdLegerGroup(brokerName); storeConfig.setdLegerSelfId(selfId);