diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 18966d1766ca5f571b15c55d9427b2dbf9176a11..9823f76cf985a820cfb9c1728b65c491390ee8b0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -404,7 +404,7 @@ public class BrokerController { ((NettyRemotingServer) fastRemotingServer).loadSslContext(); } }); - } catch (IOException e) { + } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java index 00cebf576da6a4927bdc6694bbb702deb3530625..6ba8b38e226d377ff32b0e1ea95128a96e88adbe 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.namesrv; -import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -111,7 +110,7 @@ public class NamesrvController { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); - } catch (IOException e) { + } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java index b32e240b5109aeb0201ecd41e1517c8a76b2aa46..099b0276ebcdb065dc4469ed22fdf94c8915e670 100644 --- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java +++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java @@ -17,12 +17,14 @@ package org.apache.rocketmq.srvutil; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; -import com.google.common.io.Files; -import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,23 +32,23 @@ import org.slf4j.LoggerFactory; public class FileWatchService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); - private String [] watchFiles; - private boolean [] isFileChangedFlag; - private HashCode [] fileCurrentHash; - private Listener listener; - private static final int WATCH_INTERVAL = 100; - + private final String [] watchFiles; + private final boolean [] isFileChangedFlag; + private final String [] fileCurrentHash; + private final Listener listener; + private static final int WATCH_INTERVAL = 500; + private MessageDigest md = MessageDigest.getInstance("MD5"); public FileWatchService(final String [] watchFiles, - final Listener listener) throws IOException { + final Listener listener) throws Exception { this.watchFiles = watchFiles; this.listener = listener; this.isFileChangedFlag = new boolean[watchFiles.length]; - this.fileCurrentHash = new HashCode[watchFiles.length]; + this.fileCurrentHash = new String[watchFiles.length]; for (int i = 0; i < watchFiles.length; i++) { isFileChangedFlag[i] = false; - fileCurrentHash[i] = Files.hash(new File(watchFiles[i]), Hashing.md5()); + fileCurrentHash[i] = hash(watchFiles[i]); } } @@ -65,7 +67,7 @@ public class FileWatchService extends ServiceThread { boolean allFileChanged = true; for (int i = 0; i < watchFiles.length; i++) { - HashCode newHash = Files.hash(new File(watchFiles[i]), Hashing.md5()); + String newHash = hash(watchFiles[i]); if (!newHash.equals(fileCurrentHash[i])) { isFileChangedFlag[i] = true; fileCurrentHash[i] = newHash; @@ -86,6 +88,13 @@ public class FileWatchService extends ServiceThread { log.info(this.getServiceName() + " service end"); } + private String hash(String filePath) throws IOException, NoSuchAlgorithmException { + Path path = Paths.get(filePath); + md.update(Files.readAllBytes(path)); + byte[] hash = md.digest(); + return UtilAll.bytes2string(hash); + } + public interface Listener { /** * Will be called when the target files are changed diff --git a/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java b/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java index 0d651480ada2c06cc8b9b3ef454731bfb7d31e50..6b411dbfd4341141d6f22fe091b484cb4b1a45da 100644 --- a/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java +++ b/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.srvutil; import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.nio.file.NoSuchFileException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.junit.Rule; @@ -29,6 +30,7 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Java6Assertions.failBecauseExceptionWasNotThrown; @RunWith(MockitoJUnitRunner.class) public class FileWatchServiceTest { @@ -36,7 +38,7 @@ public class FileWatchServiceTest { public TemporaryFolder tempFolder = new TemporaryFolder(); @Test - public void watchSingleFile() throws IOException, InterruptedException { + public void watchSingleFile() throws Exception { File file = tempFolder.newFile(); final Semaphore waitSemaphore = new Semaphore(0); FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath()}, new FileWatchService.Listener() { @@ -47,13 +49,49 @@ public class FileWatchServiceTest { }); fileWatchService.start(); modifyFile(file); - boolean result = waitSemaphore.tryAcquire(1, 100, TimeUnit.MILLISECONDS); + boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS); assertThat(result).isTrue(); + } + + @Test + public void watchSingleFile_NotExits() throws Exception { + File file = tempFolder.newFile(); + final Semaphore waitSemaphore = new Semaphore(0); + try { + FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath() + 123}, new FileWatchService.Listener() { + @Override + public void onChanged() { + waitSemaphore.release(); + } + }); + failBecauseExceptionWasNotThrown(NoSuchFileException.class); + } catch (Exception e) { + assertThat(e).isInstanceOf(NoSuchFileException.class); + } + } + @Test + public void watchSingleFile_FileDeleted() throws Exception { + File file = tempFolder.newFile(); + final Semaphore waitSemaphore = new Semaphore(0); + FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath()}, new FileWatchService.Listener() { + @Override + public void onChanged() { + waitSemaphore.release(); + } + }); + fileWatchService.start(); + file.delete(); + boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS); + assertThat(result).isFalse(); + file.createNewFile(); + modifyFile(file); + result = waitSemaphore.tryAcquire(1, 2000, TimeUnit.MILLISECONDS); + assertThat(result).isTrue(); } @Test - public void watchTwoFiles_ModifyOne() throws IOException, InterruptedException { + public void watchTwoFiles_ModifyOne() throws Exception { File fileA = tempFolder.newFile(); File fileB = tempFolder.newFile(); final Semaphore waitSemaphore = new Semaphore(0); @@ -67,12 +105,12 @@ public class FileWatchServiceTest { }); fileWatchService.start(); modifyFile(fileA); - boolean result = waitSemaphore.tryAcquire(1, 100, TimeUnit.MILLISECONDS); + boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS); assertThat(result).isFalse(); } @Test - public void watchTwoFiles() throws IOException, InterruptedException { + public void watchTwoFiles() throws Exception { File fileA = tempFolder.newFile(); File fileB = tempFolder.newFile(); final Semaphore waitSemaphore = new Semaphore(0); @@ -87,14 +125,14 @@ public class FileWatchServiceTest { fileWatchService.start(); modifyFile(fileA); modifyFile(fileB); - boolean result = waitSemaphore.tryAcquire(1, 100, TimeUnit.MILLISECONDS); + boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS); assertThat(result).isTrue(); } private static void modifyFile(File file) { try { PrintWriter out = new PrintWriter(file); - out.println(System.currentTimeMillis()); + out.println(System.nanoTime()); out.flush(); out.close(); } catch (IOException ignore) {