diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 14d24d66f3735bb9fe839bf6635256858849cc33..900e9af31fa5d51c8342be4ee8cffa23df49e822 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -114,6 +114,9 @@ public class DefaultMessageStore implements MessageStore { boolean shutDownNormal = false; + private final ScheduledExecutorService diskCheckScheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DiskCheckScheduledThread")); + public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; @@ -293,7 +296,7 @@ public class DefaultMessageStore implements MessageStore { this.shutdown = true; this.scheduledExecutorService.shutdown(); - + this.diskCheckScheduledExecutorService.shutdown(); try { Thread.sleep(1000); @@ -1329,6 +1332,11 @@ public class DefaultMessageStore implements MessageStore { // DefaultMessageStore.this.cleanExpiredConsumerQueue(); // } // }, 1, 1, TimeUnit.HOURS); + this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() { + public void run() { + DefaultMessageStore.this.cleanCommitLogService.isSpaceFull(); + } + }, 1000L, 10000L, TimeUnit.MILLISECONDS); } private void cleanFilesPeriodically() { @@ -1727,6 +1735,30 @@ public class DefaultMessageStore implements MessageStore { public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; } + public boolean isSpaceFull() { + String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; + if (physicRatio > ratio) { + DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio); + } + if (physicRatio > this.diskSpaceWarningLevelRatio) { + boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); + if (diskok) { + DefaultMessageStore.log.error("physic disk of commitLog maybe full soon, used " + physicRatio + ", so mark disk full"); + } + + return true; + } else { + boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); + + if (!diskok) { + DefaultMessageStore.log.info("physic disk space of commitLog OK " + physicRatio + ", so mark disk ok"); + } + + return false; + } + } } class CleanConsumeQueueService { diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index abf7356e8a3ca2eca223b81ccaa575c32ecbb0f0..7455dfea3c811381d86a7e733aea0394479eec7e 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -72,6 +72,43 @@ public class DefaultMessageStoreCleanFilesTest { bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); } + @Test + public void testIsSpaceFullFunctionEmpty2Full() throws Exception { + String deleteWhen = "04"; + // the min value of diskMaxUsedSpaceRatio. + int diskMaxUsedSpaceRatio = 1; + // used to set disk-full flag + double diskSpaceCleanForciblyRatio = 0.01D; + initMessageStore(deleteWhen, diskMaxUsedSpaceRatio, diskSpaceCleanForciblyRatio); + // build and put 55 messages, exactly one message per CommitLog file. + buildAndPutMessagesToMessageStore(msgCount); + MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog(); + assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size()); + int fileCountConsumeQueue = getFileCountConsumeQueue(); + MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue(); + assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size()); + cleanCommitLogService.isSpaceFull(); + assertEquals(1 << 4, messageStore.getRunningFlags().getFlagBits() & (1 << 4)); + messageStore.shutdown(); + messageStore.destroy(); + + } + + @Test + public void testIsSpaceFullFunctionFull2Empty() throws Exception { + String deleteWhen = "04"; + // the min value of diskMaxUsedSpaceRatio. + int diskMaxUsedSpaceRatio = 1; + //use to reset disk-full flag + double diskSpaceCleanForciblyRatio = 0.999D; + initMessageStore(deleteWhen, diskMaxUsedSpaceRatio, diskSpaceCleanForciblyRatio); + //set disk full + messageStore.getRunningFlags().getAndMakeDiskFull(); + + cleanCommitLogService.isSpaceFull(); + assertEquals(0, messageStore.getRunningFlags().getFlagBits() & (1 << 4)); + } + @Test public void testDeleteExpiredFilesByTimeUp() throws Exception { String deleteWhen = Calendar.getInstance().get(Calendar.HOUR_OF_DAY) + "";