Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
5e259f86
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
5e259f86
编写于
12月 14, 2017
作者:
Y
yukon
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Avoid using guava Files in FileWatchService
上级
8c0759e3
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
70 addition
and
24 deletion
+70
-24
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+1
-1
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
...n/java/org/apache/rocketmq/namesrv/NamesrvController.java
+1
-2
srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
...in/java/org/apache/rocketmq/srvutil/FileWatchService.java
+23
-14
srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
...est/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
+45
-7
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
5e259f86
...
...
@@ -404,7 +404,7 @@ public class BrokerController {
((
NettyRemotingServer
)
fastRemotingServer
).
loadSslContext
();
}
});
}
catch
(
IO
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
warn
(
"FileWatchService created error, can't load the certificate dynamically"
);
}
}
...
...
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
浏览文件 @
5e259f86
...
...
@@ -16,7 +16,6 @@
*/
package
org.apache.rocketmq.namesrv
;
import
java.io.IOException
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
...
...
@@ -111,7 +110,7 @@ public class NamesrvController {
((
NettyRemotingServer
)
remotingServer
).
loadSslContext
();
}
});
}
catch
(
IO
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
warn
(
"FileWatchService created error, can't load the certificate dynamically"
);
}
}
...
...
srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
浏览文件 @
5e259f86
...
...
@@ -17,12 +17,14 @@
package
org.apache.rocketmq.srvutil
;
import
com.google.common.hash.HashCode
;
import
com.google.common.hash.Hashing
;
import
com.google.common.io.Files
;
import
java.io.File
;
import
java.io.IOException
;
import
java.nio.file.Files
;
import
java.nio.file.Path
;
import
java.nio.file.Paths
;
import
java.security.MessageDigest
;
import
java.security.NoSuchAlgorithmException
;
import
org.apache.rocketmq.common.ServiceThread
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -30,23 +32,23 @@ import org.slf4j.LoggerFactory;
public
class
FileWatchService
extends
ServiceThread
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
private
String
[]
watchFiles
;
private
boolean
[]
isFileChangedFlag
;
private
HashCode
[]
fileCurrentHash
;
private
Listener
listener
;
private
static
final
int
WATCH_INTERVAL
=
1
00
;
private
final
String
[]
watchFiles
;
private
final
boolean
[]
isFileChangedFlag
;
private
final
String
[]
fileCurrentHash
;
private
final
Listener
listener
;
private
static
final
int
WATCH_INTERVAL
=
5
00
;
private
MessageDigest
md
=
MessageDigest
.
getInstance
(
"MD5"
);
public
FileWatchService
(
final
String
[]
watchFiles
,
final
Listener
listener
)
throws
IO
Exception
{
final
Listener
listener
)
throws
Exception
{
this
.
watchFiles
=
watchFiles
;
this
.
listener
=
listener
;
this
.
isFileChangedFlag
=
new
boolean
[
watchFiles
.
length
];
this
.
fileCurrentHash
=
new
HashCode
[
watchFiles
.
length
];
this
.
fileCurrentHash
=
new
String
[
watchFiles
.
length
];
for
(
int
i
=
0
;
i
<
watchFiles
.
length
;
i
++)
{
isFileChangedFlag
[
i
]
=
false
;
fileCurrentHash
[
i
]
=
Files
.
hash
(
new
File
(
watchFiles
[
i
]),
Hashing
.
md5
()
);
fileCurrentHash
[
i
]
=
hash
(
watchFiles
[
i
]
);
}
}
...
...
@@ -65,7 +67,7 @@ public class FileWatchService extends ServiceThread {
boolean
allFileChanged
=
true
;
for
(
int
i
=
0
;
i
<
watchFiles
.
length
;
i
++)
{
HashCode
newHash
=
Files
.
hash
(
new
File
(
watchFiles
[
i
]),
Hashing
.
md5
()
);
String
newHash
=
hash
(
watchFiles
[
i
]
);
if
(!
newHash
.
equals
(
fileCurrentHash
[
i
]))
{
isFileChangedFlag
[
i
]
=
true
;
fileCurrentHash
[
i
]
=
newHash
;
...
...
@@ -86,6 +88,13 @@ public class FileWatchService extends ServiceThread {
log
.
info
(
this
.
getServiceName
()
+
" service end"
);
}
private
String
hash
(
String
filePath
)
throws
IOException
,
NoSuchAlgorithmException
{
Path
path
=
Paths
.
get
(
filePath
);
md
.
update
(
Files
.
readAllBytes
(
path
));
byte
[]
hash
=
md
.
digest
();
return
UtilAll
.
bytes2string
(
hash
);
}
public
interface
Listener
{
/**
* Will be called when the target files are changed
...
...
srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
浏览文件 @
5e259f86
...
...
@@ -20,6 +20,7 @@ package org.apache.rocketmq.srvutil;
import
java.io.File
;
import
java.io.IOException
;
import
java.io.PrintWriter
;
import
java.nio.file.NoSuchFileException
;
import
java.util.concurrent.Semaphore
;
import
java.util.concurrent.TimeUnit
;
import
org.junit.Rule
;
...
...
@@ -29,6 +30,7 @@ import org.junit.runner.RunWith;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
assertj
.
core
.
api
.
Java6Assertions
.
failBecauseExceptionWasNotThrown
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
FileWatchServiceTest
{
...
...
@@ -36,7 +38,7 @@ public class FileWatchServiceTest {
public
TemporaryFolder
tempFolder
=
new
TemporaryFolder
();
@Test
public
void
watchSingleFile
()
throws
IOException
,
Interrupted
Exception
{
public
void
watchSingleFile
()
throws
Exception
{
File
file
=
tempFolder
.
newFile
();
final
Semaphore
waitSemaphore
=
new
Semaphore
(
0
);
FileWatchService
fileWatchService
=
new
FileWatchService
(
new
String
[]
{
file
.
getAbsolutePath
()},
new
FileWatchService
.
Listener
()
{
...
...
@@ -47,13 +49,49 @@ public class FileWatchServiceTest {
});
fileWatchService
.
start
();
modifyFile
(
file
);
boolean
result
=
waitSemaphore
.
tryAcquire
(
1
,
100
,
TimeUnit
.
MILLISECONDS
);
boolean
result
=
waitSemaphore
.
tryAcquire
(
1
,
100
0
,
TimeUnit
.
MILLISECONDS
);
assertThat
(
result
).
isTrue
();
}
@Test
public
void
watchSingleFile_NotExits
()
throws
Exception
{
File
file
=
tempFolder
.
newFile
();
final
Semaphore
waitSemaphore
=
new
Semaphore
(
0
);
try
{
FileWatchService
fileWatchService
=
new
FileWatchService
(
new
String
[]
{
file
.
getAbsolutePath
()
+
123
},
new
FileWatchService
.
Listener
()
{
@Override
public
void
onChanged
()
{
waitSemaphore
.
release
();
}
});
failBecauseExceptionWasNotThrown
(
NoSuchFileException
.
class
);
}
catch
(
Exception
e
)
{
assertThat
(
e
).
isInstanceOf
(
NoSuchFileException
.
class
);
}
}
@Test
public
void
watchSingleFile_FileDeleted
()
throws
Exception
{
File
file
=
tempFolder
.
newFile
();
final
Semaphore
waitSemaphore
=
new
Semaphore
(
0
);
FileWatchService
fileWatchService
=
new
FileWatchService
(
new
String
[]
{
file
.
getAbsolutePath
()},
new
FileWatchService
.
Listener
()
{
@Override
public
void
onChanged
()
{
waitSemaphore
.
release
();
}
});
fileWatchService
.
start
();
file
.
delete
();
boolean
result
=
waitSemaphore
.
tryAcquire
(
1
,
1000
,
TimeUnit
.
MILLISECONDS
);
assertThat
(
result
).
isFalse
();
file
.
createNewFile
();
modifyFile
(
file
);
result
=
waitSemaphore
.
tryAcquire
(
1
,
2000
,
TimeUnit
.
MILLISECONDS
);
assertThat
(
result
).
isTrue
();
}
@Test
public
void
watchTwoFiles_ModifyOne
()
throws
IOException
,
Interrupted
Exception
{
public
void
watchTwoFiles_ModifyOne
()
throws
Exception
{
File
fileA
=
tempFolder
.
newFile
();
File
fileB
=
tempFolder
.
newFile
();
final
Semaphore
waitSemaphore
=
new
Semaphore
(
0
);
...
...
@@ -67,12 +105,12 @@ public class FileWatchServiceTest {
});
fileWatchService
.
start
();
modifyFile
(
fileA
);
boolean
result
=
waitSemaphore
.
tryAcquire
(
1
,
100
,
TimeUnit
.
MILLISECONDS
);
boolean
result
=
waitSemaphore
.
tryAcquire
(
1
,
100
0
,
TimeUnit
.
MILLISECONDS
);
assertThat
(
result
).
isFalse
();
}
@Test
public
void
watchTwoFiles
()
throws
IOException
,
Interrupted
Exception
{
public
void
watchTwoFiles
()
throws
Exception
{
File
fileA
=
tempFolder
.
newFile
();
File
fileB
=
tempFolder
.
newFile
();
final
Semaphore
waitSemaphore
=
new
Semaphore
(
0
);
...
...
@@ -87,14 +125,14 @@ public class FileWatchServiceTest {
fileWatchService
.
start
();
modifyFile
(
fileA
);
modifyFile
(
fileB
);
boolean
result
=
waitSemaphore
.
tryAcquire
(
1
,
100
,
TimeUnit
.
MILLISECONDS
);
boolean
result
=
waitSemaphore
.
tryAcquire
(
1
,
100
0
,
TimeUnit
.
MILLISECONDS
);
assertThat
(
result
).
isTrue
();
}
private
static
void
modifyFile
(
File
file
)
{
try
{
PrintWriter
out
=
new
PrintWriter
(
file
);
out
.
println
(
System
.
currentTimeMillis
());
out
.
println
(
System
.
nanoTime
());
out
.
flush
();
out
.
close
();
}
catch
(
IOException
ignore
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录