diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index c5e38dbedb43f8b9ce95a2ecdd7304999899aca0..dc1c96c0a150cf4cdf22b394e8bc79bc9b6a45ce 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -77,19 +77,19 @@ public class ConsumeQueue { if (index < 0) index = 0; - int mapedFileSizeLogics = this.mappedFileSize; + int mappedFileSizeLogics = this.mappedFileSize; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); - long mapedFileOffset = 0; + long mappedFileOffset = 0; while (true) { - for (int i = 0; i < mapedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { + for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { long offset = byteBuffer.getLong(); int size = byteBuffer.getInt(); long tagsCode = byteBuffer.getLong(); if (offset >= 0 && size > 0) { - mapedFileOffset = i + CQ_STORE_UNIT_SIZE; + mappedFileOffset = i + CQ_STORE_UNIT_SIZE; this.maxPhysicOffset = offset; } else { log.info("recover current consume queue file over, " + mappedFile.getFileName() + " " @@ -98,7 +98,7 @@ public class ConsumeQueue { } } - if (mapedFileOffset == mapedFileSizeLogics) { + if (mappedFileOffset == mappedFileSizeLogics) { index++; if (index >= mappedFiles.size()) { @@ -109,17 +109,17 @@ public class ConsumeQueue { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); - mapedFileOffset = 0; + mappedFileOffset = 0; log.info("recover next consume queue file, " + mappedFile.getFileName()); } } else { log.info("recover current consume queue queue over " + mappedFile.getFileName() + " " - + (processOffset + mapedFileOffset)); + + (processOffset + mappedFileOffset)); break; } } - processOffset += mapedFileOffset; + processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); @@ -310,7 +310,7 @@ public class ConsumeQueue { if (offsetPy >= phyMinOffset) { this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i; - log.info("compute logics min offset: " + this.getMinOffsetInQuque() + ", topic: " + log.info("compute logics min offset: " + this.getMinOffsetInQueue() + ", topic: " + this.topic + ", queueId: " + this.queueId); break; } @@ -324,7 +324,7 @@ public class ConsumeQueue { } } - public long getMinOffsetInQuque() { + public long getMinOffsetInQueue() { return this.minLogicOffset / CQ_STORE_UNIT_SIZE; } @@ -435,8 +435,8 @@ public class ConsumeQueue { } public long rollNextFile(final long index) { - int mapedFileSize = this.mappedFileSize; - int totalUnitsInFile = mapedFileSize / CQ_STORE_UNIT_SIZE; + int mappedFileSize = this.mappedFileSize; + int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE; return index + totalUnitsInFile - index % totalUnitsInFile; } @@ -463,10 +463,10 @@ public class ConsumeQueue { } public long getMessageTotalInQueue() { - return this.getMaxOffsetInQuque() - this.getMinOffsetInQuque(); + return this.getMaxOffsetInQueue() - this.getMinOffsetInQueue(); } - public long getMaxOffsetInQuque() { + public long getMaxOffsetInQueue() { return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 3a43c21b76d1fe14c4d9209e8d3d0d9d88476049..2594ef323412676935112436cf4889dbcfb8eb6d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -376,8 +376,8 @@ public class DefaultMessageStore implements MessageStore { ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { - minOffset = consumeQueue.getMinOffsetInQuque(); - maxOffset = consumeQueue.getMaxOffsetInQuque(); + minOffset = consumeQueue.getMinOffsetInQueue(); + maxOffset = consumeQueue.getMaxOffsetInQueue(); if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; @@ -499,7 +499,7 @@ public class DefaultMessageStore implements MessageStore { public long getMaxOffsetInQuque(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { - long offset = logic.getMaxOffsetInQuque(); + long offset = logic.getMaxOffsetInQueue(); return offset; } @@ -512,7 +512,7 @@ public class DefaultMessageStore implements MessageStore { public long getMinOffsetInQuque(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { - return logic.getMinOffsetInQuque(); + return logic.getMinOffsetInQueue(); } return -1; @@ -878,8 +878,8 @@ public class DefaultMessageStore implements MessageStore { ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { - minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQuque()); - maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQuque()); + minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQueue()); + maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQueue()); if (maxOffset == 0) { return messageIds; @@ -1220,7 +1220,7 @@ public class DefaultMessageStore implements MessageStore { for (ConcurrentHashMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); - table.put(key, logic.getMaxOffsetInQuque()); + table.put(key, logic.getMaxOffsetInQueue()); logic.correctMinOffset(minPhyOffset); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index cd4b57b8b347d8a9f6606384fed15df8a59fb48f..feb505d6dba8754f37009a282a78b48c3953cda4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -43,9 +43,9 @@ public class MappedFile extends ReferenceResource { public static final int OS_PAGE_SIZE = 1024 * 4; protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - private static final AtomicLong TOTAL_MAPED_VITUAL_MEMORY = new AtomicLong(0); + private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); - private static final AtomicInteger TOTAL_MAPED_FILES = new AtomicInteger(0); + private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); protected final AtomicInteger wrotePosition = new AtomicInteger(0); //ADD BY ChenYang protected final AtomicInteger committedPosition = new AtomicInteger(0); @@ -132,12 +132,12 @@ public class MappedFile extends ReferenceResource { return viewed(viewedBuffer); } - public static int getTotalmapedfiles() { - return TOTAL_MAPED_FILES.get(); + public static int getTotalMappedFiles() { + return TOTAL_MAPPED_FILES.get(); } - public static long getTotalMapedVitualMemory() { - return TOTAL_MAPED_VITUAL_MEMORY.get(); + public static long getTotalMappedVirtualMemory() { + return TOTAL_MAPPED_VIRTUAL_MEMORY.get(); } public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { @@ -158,8 +158,8 @@ public class MappedFile extends ReferenceResource { try { this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); - TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize); - TOTAL_MAPED_FILES.incrementAndGet(); + TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); + TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("create file channel " + this.fileName + " Failed. ", e); @@ -405,8 +405,8 @@ public class MappedFile extends ReferenceResource { } clean(this.mappedByteBuffer); - TOTAL_MAPED_VITUAL_MEMORY.addAndGet(this.fileSize * (-1)); - TOTAL_MAPED_FILES.decrementAndGet(); + TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1)); + TOTAL_MAPPED_FILES.decrementAndGet(); log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK"); return true; } @@ -431,7 +431,7 @@ public class MappedFile extends ReferenceResource { return true; } else { - log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + this.fileName + log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + " Failed. cleanupOver: " + this.cleanupOver); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index f98b26a194b71cdbde00068668e52be0cb21a745..5c6c62c6613001b0c48ad4317d102cd989a78b22 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -397,11 +397,15 @@ public class MappedFileQueue { log.info("physic min offset " + offset + ", logics in current mappedFile max offset " + maxOffsetInLogicQueue + ", delete it"); } + } else if (!mappedFile.isAvailable()) { // Handle hanged file. + log.warn("Found a hanged consume queue file, attempting to delete it."); + destroy = true; } else { log.warn("this being not executed forever."); break; } + // TODO: Externalize this hardcoded value if (destroy && mappedFile.destroy(1000 * 60)) { files.add(mappedFile); deleteCount++; diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 5b42082af0824b4f1e9d149e55016ba8d9a9a9e4..e08a6f54d0ca9f7ade1065ce1b9dfd1c4df73c5a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -324,7 +324,7 @@ public class ScheduleMessageService extends ConfigManager { */ - long cqMinOffset = cq.getMinOffsetInQuque(); + long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java index 90fbbd35015b9ce9ccb66dfdc166c8922d886241..2d6c112a035845f7a67142a10368cf5c0843ab02 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java @@ -20,6 +20,8 @@ */ package org.apache.rocketmq.store; +import java.nio.ByteBuffer; +import java.util.Arrays; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class MappedFileQueueTest { @@ -55,7 +58,7 @@ public class MappedFileQueueTest { } @Test - public void test_getLastMapedFile() { + public void test_getLastMappedFile() { final String fixedMsg = "0123456789abcdef"; logger.debug("================================================================"); @@ -79,7 +82,7 @@ public class MappedFileQueueTest { } @Test - public void test_findMapedFileByOffset() { + public void test_findMappedFileByOffset() { // four-byte string. final String fixedMsg = "abcd"; @@ -179,7 +182,7 @@ public class MappedFileQueueTest { } @Test - public void test_getMapedMemorySize() { + public void test_getMappedMemorySize() { final String fixedMsg = "abcd"; logger.debug("================================================================"); @@ -200,4 +203,45 @@ public class MappedFileQueueTest { mappedFileQueue.destroy(); logger.debug("MappedFileQueue.getMappedMemorySize() OK"); } + + + @Test + public void test_deleteExpiredFileByOffset() { + + logger.debug("================================================================"); + MappedFileQueue mappedFileQueue = + new MappedFileQueue("target/unit_test_store/e", 5120, null); + + for (int i = 0; i < 2048; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertNotNull(mappedFile); + + ByteBuffer byteBuffer = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE); + byteBuffer.putLong(i); + byte[] padding = new byte[12]; + Arrays.fill(padding, (byte)'0'); + byteBuffer.put(padding); + byteBuffer.flip(); + + boolean result = mappedFile.appendMessage(byteBuffer.array()); + + assertTrue(result); + } + + MappedFile first = mappedFileQueue.getFirstMappedFile(); + first.hold(); + + int count = mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE); + assertEquals(0, count); + first.release(); + + count = mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE); + assertTrue(count > 0); + first = mappedFileQueue.getFirstMappedFile(); + assertTrue(first.getFileFromOffset() > 0); + + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + logger.debug("MappedFileQueue.deleteExpiredFileByOffset() OK"); + } }