Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
c67a80a2
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
c67a80a2
编写于
6年前
作者:
Z
Zhendong Liu
提交者:
GitHub
6年前
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into master
上级
0fff4a8d
efa8e457
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
82 addition
and
22 deletion
+82
-22
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+25
-5
broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
...org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+3
-0
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
...pache/rocketmq/broker/processor/AdminBrokerProcessor.java
+4
-0
broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
...java/org/apache/rocketmq/broker/util/ServiceProvider.java
+17
-15
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
...rc/main/java/org/apache/rocketmq/common/BrokerConfig.java
+33
-2
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
c67a80a2
...
...
@@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import
org.apache.rocketmq.store.stats.BrokerStats
;
import
org.apache.rocketmq.store.stats.BrokerStatsManager
;
public
class
BrokerController
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
static
final
InternalLogger
LOG_PROTECTION
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
PROTECTION_LOGGER_NAME
);
...
...
@@ -131,6 +130,7 @@ public class BrokerController {
private
final
BlockingQueue
<
Runnable
>
clientManagerThreadPoolQueue
;
private
final
BlockingQueue
<
Runnable
>
heartbeatThreadPoolQueue
;
private
final
BlockingQueue
<
Runnable
>
consumerManagerThreadPoolQueue
;
private
final
BlockingQueue
<
Runnable
>
endTransactionThreadPoolQueue
;
private
final
FilterServerManager
filterServerManager
;
private
final
BrokerStatsManager
brokerStatsManager
;
private
final
List
<
SendMessageHook
>
sendMessageHookList
=
new
ArrayList
<
SendMessageHook
>();
...
...
@@ -146,6 +146,7 @@ public class BrokerController {
private
ExecutorService
clientManageExecutor
;
private
ExecutorService
heartbeatExecutor
;
private
ExecutorService
consumerManageExecutor
;
private
ExecutorService
endTransactionExecutor
;
private
boolean
updateMasterHAServerAddrPeriodically
=
false
;
private
BrokerStats
brokerStats
;
private
InetSocketAddress
storeHost
;
...
...
@@ -189,6 +190,7 @@ public class BrokerController {
this
.
clientManagerThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getClientManagerThreadPoolQueueCapacity
());
this
.
consumerManagerThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getConsumerManagerThreadPoolQueueCapacity
());
this
.
heartbeatThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getHeartbeatThreadPoolQueueCapacity
());
this
.
endTransactionThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getEndTransactionPoolQueueCapacity
());
this
.
brokerStatsManager
=
new
BrokerStatsManager
(
this
.
brokerConfig
.
getBrokerClusterName
());
this
.
setStoreHost
(
new
InetSocketAddress
(
this
.
getBrokerConfig
().
getBrokerIP1
(),
this
.
getNettyServerConfig
().
getListenPort
()));
...
...
@@ -289,8 +291,15 @@ public class BrokerController {
1000
*
60
,
TimeUnit
.
MILLISECONDS
,
this
.
heartbeatThreadPoolQueue
,
new
ThreadFactoryImpl
(
"HeartbeatThread_"
,
true
));
new
ThreadFactoryImpl
(
"HeartbeatThread_"
,
true
));
this
.
endTransactionExecutor
=
new
BrokerFixedThreadPoolExecutor
(
this
.
brokerConfig
.
getEndTransactionThreadPoolNums
(),
this
.
brokerConfig
.
getEndTransactionThreadPoolNums
(),
1000
*
60
,
TimeUnit
.
MILLISECONDS
,
this
.
endTransactionThreadPoolQueue
,
new
ThreadFactoryImpl
(
"EndTransactionThread_"
));
this
.
consumerManageExecutor
=
Executors
.
newFixedThreadPool
(
this
.
brokerConfig
.
getConsumerManageThreadPoolNums
(),
new
ThreadFactoryImpl
(
...
...
@@ -536,8 +545,8 @@ public class BrokerController {
/**
* EndTransactionProcessor
*/
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
END_TRANSACTION
,
new
EndTransactionProcessor
(
this
),
this
.
sendMessage
Executor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
END_TRANSACTION
,
new
EndTransactionProcessor
(
this
),
this
.
sendMessage
Executor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
END_TRANSACTION
,
new
EndTransactionProcessor
(
this
),
this
.
endTransaction
Executor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
END_TRANSACTION
,
new
EndTransactionProcessor
(
this
),
this
.
endTransaction
Executor
);
/**
* Default
...
...
@@ -598,10 +607,15 @@ public class BrokerController {
return
this
.
headSlowTimeMills
(
this
.
queryThreadPoolQueue
);
}
public
long
headSlowTimeMills4EndTransactionThreadPoolQueue
()
{
return
this
.
headSlowTimeMills
(
this
.
endTransactionThreadPoolQueue
);
}
public
void
printWaterMark
()
{
LOG_WATER_MARK
.
info
(
"[WATERMARK] Send Queue Size: {} SlowTimeMills: {}"
,
this
.
sendThreadPoolQueue
.
size
(),
headSlowTimeMills4SendThreadPoolQueue
());
LOG_WATER_MARK
.
info
(
"[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}"
,
this
.
pullThreadPoolQueue
.
size
(),
headSlowTimeMills4PullThreadPoolQueue
());
LOG_WATER_MARK
.
info
(
"[WATERMARK] Query Queue Size: {} SlowTimeMills: {}"
,
this
.
queryThreadPoolQueue
.
size
(),
headSlowTimeMills4QueryThreadPoolQueue
());
LOG_WATER_MARK
.
info
(
"[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}"
,
this
.
endTransactionThreadPoolQueue
.
size
(),
headSlowTimeMills4EndTransactionThreadPoolQueue
());
}
public
MessageStore
getMessageStore
()
{
...
...
@@ -741,9 +755,11 @@ public class BrokerController {
if
(
this
.
fileWatchService
!=
null
)
{
this
.
fileWatchService
.
shutdown
();
}
if
(
this
.
transactionalMessageCheckService
!=
null
)
{
this
.
transactionalMessageCheckService
.
shutdown
(
false
);
if
(
this
.
endTransactionExecutor
!=
null
)
{
this
.
endTransactionExecutor
.
shutdown
();
}
}
...
...
@@ -1031,4 +1047,8 @@ public class BrokerController {
AbstractTransactionalMessageCheckListener
transactionalMessageCheckListener
)
{
this
.
transactionalMessageCheckListener
=
transactionalMessageCheckListener
;
}
public
BlockingQueue
<
Runnable
>
getEndTransactionThreadPoolQueue
()
{
return
endTransactionThreadPoolQueue
;
}
}
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
浏览文件 @
c67a80a2
...
...
@@ -92,6 +92,9 @@ public class BrokerFastFailure {
cleanExpiredRequestInQueue
(
this
.
brokerController
.
getHeartbeatThreadPoolQueue
(),
this
.
brokerController
.
getBrokerConfig
().
getWaitTimeMillsInHeartbeatQueue
());
cleanExpiredRequestInQueue
(
this
.
brokerController
.
getEndTransactionThreadPoolQueue
(),
this
.
brokerController
.
getBrokerConfig
().
getWaitTimeMillsInTransactionQueue
());
}
void
cleanExpiredRequestInQueue
(
final
BlockingQueue
<
Runnable
>
blockingQueue
,
final
long
maxWaitTimeMillsInQueue
)
{
...
...
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
浏览文件 @
c67a80a2
...
...
@@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
runtimeInfo
.
put
(
"queryThreadPoolQueueCapacity"
,
String
.
valueOf
(
this
.
brokerController
.
getBrokerConfig
().
getQueryThreadPoolQueueCapacity
()));
runtimeInfo
.
put
(
"EndTransactionQueueSize"
,
String
.
valueOf
(
this
.
brokerController
.
getEndTransactionThreadPoolQueue
().
size
()));
runtimeInfo
.
put
(
"EndTransactionThreadPoolQueueCapacity"
,
String
.
valueOf
(
this
.
brokerController
.
getBrokerConfig
().
getEndTransactionPoolQueueCapacity
()));
runtimeInfo
.
put
(
"dispatchBehindBytes"
,
String
.
valueOf
(
this
.
brokerController
.
getMessageStore
().
dispatchBehindBytes
()));
runtimeInfo
.
put
(
"pageCacheLockTimeMills"
,
String
.
valueOf
(
this
.
brokerController
.
getMessageStore
().
lockTimeMills
()));
...
...
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
浏览文件 @
c67a80a2
...
...
@@ -125,23 +125,25 @@ public class ServiceProvider {
public
static
<
T
>
T
loadClass
(
String
name
,
Class
<?>
clazz
)
{
final
InputStream
is
=
getResourceAsStream
(
getContextClassLoader
(),
name
);
BufferedReader
reader
;
try
{
if
(
is
!=
null
)
{
BufferedReader
reader
;
try
{
reader
=
new
BufferedReader
(
new
InputStreamReader
(
is
,
"UTF-8"
));
}
catch
(
java
.
io
.
UnsupportedEncodingException
e
)
{
reader
=
new
BufferedReader
(
new
InputStreamReader
(
is
));
}
String
serviceName
=
reader
.
readLine
();
reader
.
close
();
if
(
serviceName
!=
null
&&
!
""
.
equals
(
serviceName
))
{
return
initService
(
getContextClassLoader
(),
serviceName
,
clazz
);
}
else
{
LOG
.
warn
(
"ServiceName is empty!"
);
return
null
;
try
{
reader
=
new
BufferedReader
(
new
InputStreamReader
(
is
,
"UTF-8"
));
}
catch
(
java
.
io
.
UnsupportedEncodingException
e
)
{
reader
=
new
BufferedReader
(
new
InputStreamReader
(
is
));
}
String
serviceName
=
reader
.
readLine
();
reader
.
close
();
if
(
serviceName
!=
null
&&
!
""
.
equals
(
serviceName
))
{
return
initService
(
getContextClassLoader
(),
serviceName
,
clazz
);
}
else
{
LOG
.
warn
(
"ServiceName is empty!"
);
return
null
;
}
}
catch
(
Exception
e
)
{
LOG
.
warn
(
"Error occurred when looking for resource file "
+
name
,
e
);
}
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Error occured when looking for resource file "
+
name
,
e
);
}
return
null
;
}
...
...
This diff is collapsed.
Click to expand it.
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
浏览文件 @
c67a80a2
...
...
@@ -63,7 +63,12 @@ public class BrokerConfig {
private
int
adminBrokerThreadPoolNums
=
16
;
private
int
clientManageThreadPoolNums
=
32
;
private
int
consumerManageThreadPoolNums
=
32
;
private
int
heartbeatThreadPoolNums
=
Math
.
min
(
32
,
Runtime
.
getRuntime
().
availableProcessors
());
private
int
heartbeatThreadPoolNums
=
Math
.
min
(
32
,
Runtime
.
getRuntime
().
availableProcessors
());
/**
* Thread numbers for EndTransactionProcessor
*/
private
int
endTransactionThreadPoolNums
=
8
+
Runtime
.
getRuntime
().
availableProcessors
()
*
2
;
private
int
flushConsumerOffsetInterval
=
1000
*
5
;
...
...
@@ -79,6 +84,7 @@ public class BrokerConfig {
private
int
clientManagerThreadPoolQueueCapacity
=
1000000
;
private
int
consumerManagerThreadPoolQueueCapacity
=
1000000
;
private
int
heartbeatThreadPoolQueueCapacity
=
50000
;
private
int
endTransactionPoolQueueCapacity
=
100000
;
private
int
filterServerNums
=
0
;
...
...
@@ -111,6 +117,7 @@ public class BrokerConfig {
private
long
waitTimeMillsInSendQueue
=
200
;
private
long
waitTimeMillsInPullQueue
=
5
*
1000
;
private
long
waitTimeMillsInHeartbeatQueue
=
31
*
1000
;
private
long
waitTimeMillsInTransactionQueue
=
3
*
1000
;
private
long
startAcceptSendRequestTimeStamp
=
0L
;
...
...
@@ -156,7 +163,7 @@ public class BrokerConfig {
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/
@ImportantField
private
int
transactionCheckMax
=
5
;
private
int
transactionCheckMax
=
1
5
;
/**
* Transaction message check interval.
...
...
@@ -701,4 +708,28 @@ public class BrokerConfig {
public
void
setTransactionCheckInterval
(
long
transactionCheckInterval
)
{
this
.
transactionCheckInterval
=
transactionCheckInterval
;
}
public
int
getEndTransactionThreadPoolNums
()
{
return
endTransactionThreadPoolNums
;
}
public
void
setEndTransactionThreadPoolNums
(
int
endTransactionThreadPoolNums
)
{
this
.
endTransactionThreadPoolNums
=
endTransactionThreadPoolNums
;
}
public
int
getEndTransactionPoolQueueCapacity
()
{
return
endTransactionPoolQueueCapacity
;
}
public
void
setEndTransactionPoolQueueCapacity
(
int
endTransactionPoolQueueCapacity
)
{
this
.
endTransactionPoolQueueCapacity
=
endTransactionPoolQueueCapacity
;
}
public
long
getWaitTimeMillsInTransactionQueue
()
{
return
waitTimeMillsInTransactionQueue
;
}
public
void
setWaitTimeMillsInTransactionQueue
(
long
waitTimeMillsInTransactionQueue
)
{
this
.
waitTimeMillsInTransactionQueue
=
waitTimeMillsInTransactionQueue
;
}
}
This diff is collapsed.
Click to expand it.
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录
新手
引导
客服
返回
顶部