Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
38554a08
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 接近 3 年
通知
259
Star
16136
Fork
69
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
38554a08
编写于
12月 13, 2018
作者:
D
dongeforever
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Polish ServiceThread to make inner thread private
上级
698ef2f3
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
20 addition
and
15 deletion
+20
-15
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
...c/main/java/org/apache/rocketmq/common/ServiceThread.java
+16
-4
store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
.../org/apache/rocketmq/store/AllocateMappedFileService.java
+2
-9
store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
.../main/java/org/apache/rocketmq/store/ha/HAConnection.java
+2
-2
未找到文件。
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
浏览文件 @
38554a08
...
...
@@ -27,10 +27,11 @@ public abstract class ServiceThread implements Runnable {
private
static
final
long
JOIN_TIME
=
90
*
1000
;
pr
otected
Thread
thread
;
pr
ivate
Thread
thread
;
protected
final
CountDownLatch2
waitPoint
=
new
CountDownLatch2
(
1
);
protected
volatile
AtomicBoolean
hasNotified
=
new
AtomicBoolean
(
false
);
protected
volatile
boolean
stopped
=
false
;
protected
boolean
isDaemon
=
false
;
//Make it able to restart the thread
private
final
AtomicBoolean
started
=
new
AtomicBoolean
(
false
);
...
...
@@ -48,6 +49,7 @@ public abstract class ServiceThread implements Runnable {
}
stopped
=
false
;
this
.
thread
=
new
Thread
(
this
,
getServiceName
());
this
.
thread
.
setDaemon
(
isDaemon
);
this
.
thread
.
start
();
}
...
...
@@ -56,7 +58,7 @@ public abstract class ServiceThread implements Runnable {
}
public
void
shutdown
(
final
boolean
interrupt
)
{
log
.
info
(
"Try to s
tart
service thread:{} started:{} lastThread:{}"
,
getServiceName
(),
started
.
get
(),
thread
);
log
.
info
(
"Try to s
hutdown
service thread:{} started:{} lastThread:{}"
,
getServiceName
(),
started
.
get
(),
thread
);
if
(!
started
.
compareAndSet
(
true
,
false
))
{
return
;
}
...
...
@@ -88,12 +90,14 @@ public abstract class ServiceThread implements Runnable {
return
JOIN_TIME
;
}
@Deprecated
public
void
stop
()
{
this
.
stop
(
false
);
}
@Deprecated
public
void
stop
(
final
boolean
interrupt
)
{
if
(!
started
.
get
(
))
{
if
(!
started
.
compareAndSet
(
true
,
false
))
{
return
;
}
this
.
stopped
=
true
;
...
...
@@ -109,7 +113,7 @@ public abstract class ServiceThread implements Runnable {
}
public
void
makeStop
()
{
if
(!
started
.
get
(
))
{
if
(!
started
.
compareAndSet
(
true
,
false
))
{
return
;
}
this
.
stopped
=
true
;
...
...
@@ -147,4 +151,12 @@ public abstract class ServiceThread implements Runnable {
public
boolean
isStopped
()
{
return
stopped
;
}
public
boolean
isDaemon
()
{
return
isDaemon
;
}
public
void
setDaemon
(
boolean
daemon
)
{
isDaemon
=
daemon
;
}
}
store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
浏览文件 @
38554a08
...
...
@@ -120,16 +120,9 @@ public class AllocateMappedFileService extends ServiceThread {
return
AllocateMappedFileService
.
class
.
getSimpleName
();
}
@Override
public
void
shutdown
()
{
this
.
stopped
=
true
;
this
.
thread
.
interrupt
();
try
{
this
.
thread
.
join
(
this
.
getJointime
());
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"Interrupted"
,
e
);
}
super
.
shutdown
(
true
);
for
(
AllocateRequest
req
:
this
.
requestTable
.
values
())
{
if
(
req
.
mappedFile
!=
null
)
{
log
.
info
(
"delete pre allocated maped file, {}"
,
req
.
mappedFile
.
getFileName
());
...
...
store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
浏览文件 @
38554a08
...
...
@@ -90,7 +90,7 @@ public class HAConnection {
this
.
selector
=
RemotingUtil
.
openSelector
();
this
.
socketChannel
=
socketChannel
;
this
.
socketChannel
.
register
(
this
.
selector
,
SelectionKey
.
OP_READ
);
this
.
thread
.
setDaemon
(
true
);
this
.
setDaemon
(
true
);
}
@Override
...
...
@@ -205,7 +205,7 @@ public class HAConnection {
this
.
selector
=
RemotingUtil
.
openSelector
();
this
.
socketChannel
=
socketChannel
;
this
.
socketChannel
.
register
(
this
.
selector
,
SelectionKey
.
OP_WRITE
);
this
.
thread
.
setDaemon
(
true
);
this
.
setDaemon
(
true
);
}
@Override
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录