Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
7c2b40c3
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
7c2b40c3
编写于
10月 10, 2018
作者:
H
hujie
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
save
上级
77b9bc09
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
366 addition
and
219 deletion
+366
-219
acl-plug/pom.xml
acl-plug/pom.xml
+2
-2
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
...a/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
+32
-12
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclPlugController.java
.../java/org/apache/rocketmq/acl/plug/AclPlugController.java
+3
-1
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclRemotingServer.java
.../java/org/apache/rocketmq/acl/plug/AclRemotingServer.java
+0
-3
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
...ain/java/org/apache/rocketmq/acl/plug/Authentication.java
+2
-2
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServerImpl.java
...pache/rocketmq/acl/plug/DefaultAclRemotingServerImpl.java
+0
-7
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
...lug/engine/AuthenticationInfoManagementAclPlugEngine.java
+12
-1
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
...ache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
+9
-3
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
...g/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
+2
-4
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
...g/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
+0
-1
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
.../apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
+6
-59
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParametersEntity.java
.../rocketmq/acl/plug/entity/ControllerParametersEntity.java
+20
-6
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/MultipleNetaddressStrategy.java
...ocketmq/acl/plug/strategy/MultipleNetaddressStrategy.java
+0
-1
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java
...g/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java
+34
-2
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java
...a/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java
+16
-0
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
.../test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
+16
-2
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AuthenticationTest.java
...java/org/apache/rocketmq/acl/plug/AuthenticationTest.java
+47
-6
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
...ache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
+51
-8
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java
...he/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java
+16
-0
acl-plug/src/test/resources/conf/transport.yml
acl-plug/src/test/resources/conf/transport.yml
+4
-4
broker/pom.xml
broker/pom.xml
+74
-74
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+3
-4
broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
...che/rocketmq/broker/client/ClientHousekeepingService.java
+3
-1
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
...rc/main/java/org/apache/rocketmq/common/BrokerConfig.java
+10
-12
distribution/conf/transport.yml
distribution/conf/transport.yml
+4
-4
未找到文件。
acl-plug/pom.xml
浏览文件 @
7c2b40c3
<?xml version="1.0"?>
<project
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
>
xmlns=
"http://maven.apache.org/POM/4.0.0"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
org.apache.rocketmq
</groupId>
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
浏览文件 @
7c2b40c3
...
...
@@ -22,13 +22,32 @@ import java.util.Iterator;
import
java.util.Map
;
import
java.util.Map.Entry
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.rocketmq.acl.plug.annotation.RequestCode
;
import
org.apache.rocketmq.acl.plug.entity.AccessControl
;
import
org.apache.rocketmq.acl.plug.exception.AclPlugAccountAnalysisException
;
public
class
AccessContralAnalysis
{
private
Map
<
Class
<?>,
Map
<
Integer
,
Field
>>
classTocodeAndMentod
=
new
HashMap
<>();
private
Map
<
String
,
Integer
>
fieldNameAndCode
=
new
HashMap
<>();
public
void
analysisClass
(
Class
<?>
clazz
)
{
Field
[]
fields
=
clazz
.
getDeclaredFields
();
try
{
for
(
Field
field
:
fields
)
{
if
(
field
.
getType
().
equals
(
int
.
class
))
{
String
name
=
StringUtils
.
replace
(
field
.
getName
(),
"_"
,
""
).
toLowerCase
();
fieldNameAndCode
.
put
(
name
,
(
Integer
)
field
.
get
(
null
));
}
}
}
catch
(
IllegalArgumentException
|
IllegalAccessException
e
)
{
throw
new
AclPlugAccountAnalysisException
(
String
.
format
(
"analysis on failure Class is %s"
,
clazz
.
getName
()),
e
);
}
}
public
Map
<
Integer
,
Boolean
>
analysis
(
AccessControl
accessControl
)
{
Class
<?
extends
AccessControl
>
clazz
=
accessControl
.
getClass
();
Map
<
Integer
,
Field
>
codeAndField
=
classTocodeAndMentod
.
get
(
clazz
);
...
...
@@ -36,18 +55,19 @@ public class AccessContralAnalysis {
codeAndField
=
new
HashMap
<>();
Field
[]
fields
=
clazz
.
getDeclaredFields
();
for
(
Field
field
:
fields
)
{
RequestCode
requestCode
=
field
.
getAnnotation
(
RequestCode
.
class
);
if
(
requestCode
!=
null
)
{
int
code
=
requestCode
.
code
();
if
(
codeAndField
.
containsKey
(
code
))
{
}
else
{
field
.
setAccessible
(
true
);
codeAndField
.
put
(
code
,
field
);
}
}
if
(!
field
.
getType
().
equals
(
boolean
.
class
))
continue
;
Integer
code
=
fieldNameAndCode
.
get
(
field
.
getName
().
toLowerCase
());
if
(
code
==
null
)
{
throw
new
AclPlugAccountAnalysisException
(
String
.
format
(
"field nonexistent in code"
,
field
.
getName
()));
}
field
.
setAccessible
(
true
);
codeAndField
.
put
(
code
,
field
);
}
if
(
codeAndField
.
isEmpty
())
{
throw
new
AclPlugAccountAnalysisException
(
String
.
format
(
"AccessControl nonexistent code , name %s"
,
accessControl
.
getClass
().
getName
()));
}
classTocodeAndMentod
.
put
(
clazz
,
codeAndField
);
}
Iterator
<
Entry
<
Integer
,
Field
>>
it
=
codeAndField
.
entrySet
().
iterator
();
...
...
@@ -57,8 +77,8 @@ public class AccessContralAnalysis {
Entry
<
Integer
,
Field
>
e
=
it
.
next
();
authority
.
put
(
e
.
getKey
(),
(
Boolean
)
e
.
getValue
().
get
(
accessControl
));
}
}
catch
(
IllegalArgumentException
|
IllegalAccessException
e
1
)
{
e1
.
printStackTrace
(
);
}
catch
(
IllegalArgumentException
|
IllegalAccessException
e
)
{
throw
new
AclPlugAccountAnalysisException
(
String
.
format
(
"analysis on failure AccessControl is %s"
,
AccessControl
.
class
.
getName
()),
e
);
}
return
authority
;
}
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclPlugController.java
浏览文件 @
7c2b40c3
...
...
@@ -47,7 +47,9 @@ public class AclPlugController {
}
public
void
doChannelCloseEvent
(
String
remoteAddr
)
{
aclPlugEngine
.
deleteLoginInfo
(
remoteAddr
);
if
(
this
.
startSucceed
)
{
aclPlugEngine
.
deleteLoginInfo
(
remoteAddr
);
}
}
public
boolean
isStartSucceed
()
{
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AclRemotingServer.java
浏览文件 @
7c2b40c3
...
...
@@ -16,14 +16,11 @@
*/
package
org.apache.rocketmq.acl.plug
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationInfo
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationResult
;
import
org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl
;
public
interface
AclRemotingServer
{
public
AuthenticationInfo
login
();
public
AuthenticationResult
eachCheck
(
LoginOrRequestAccessControl
accessControl
);
}
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
浏览文件 @
7c2b40c3
...
...
@@ -45,7 +45,7 @@ public class Authentication {
authenticationResult
.
setResultString
(
String
.
format
(
"noPermitSendTopic include %s"
,
topicName
));
return
false
;
}
return
tru
e
;
return
borker
.
getPermitSendTopic
().
isEmpty
()
?
true
:
fals
e
;
}
else
if
(
code
==
11
)
{
if
(
borker
.
getPermitPullTopic
().
contains
(
topicName
))
{
return
true
;
...
...
@@ -54,7 +54,7 @@ public class Authentication {
authenticationResult
.
setResultString
(
String
.
format
(
"noPermitPullTopic include %s"
,
topicName
));
return
false
;
}
return
tru
e
;
return
borker
.
getPermitPullTopic
().
isEmpty
()
?
true
:
fals
e
;
}
return
true
;
}
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServerImpl.java
浏览文件 @
7c2b40c3
...
...
@@ -17,7 +17,6 @@
package
org.apache.rocketmq.acl.plug
;
import
org.apache.rocketmq.acl.plug.engine.AclPlugEngine
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationInfo
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationResult
;
import
org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl
;
import
org.apache.rocketmq.acl.plug.exception.AclPlugAuthenticationException
;
...
...
@@ -32,12 +31,6 @@ public class DefaultAclRemotingServerImpl implements AclRemotingServer {
this
.
aclPlugEngine
=
aclPlugEngine
;
}
@Override
public
AuthenticationInfo
login
()
{
return
null
;
}
@Override
public
AuthenticationResult
eachCheck
(
LoginOrRequestAccessControl
accessControl
)
{
AuthenticationResult
authenticationResult
=
aclPlugEngine
.
eachCheckLoginAndAuthentication
(
accessControl
);
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
浏览文件 @
7c2b40c3
...
...
@@ -19,13 +19,13 @@ package org.apache.rocketmq.acl.plug.engine;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.apache.rocketmq.acl.plug.AccessContralAnalysis
;
import
org.apache.rocketmq.acl.plug.Authentication
;
import
org.apache.rocketmq.acl.plug.entity.AccessControl
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationInfo
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationResult
;
import
org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport
;
import
org.apache.rocketmq.acl.plug.entity.ControllerParametersEntity
;
import
org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl
;
import
org.apache.rocketmq.acl.plug.exception.AclPlugAccountAnalysisException
;
import
org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy
;
...
...
@@ -48,7 +48,18 @@ public abstract class AuthenticationInfoManagementAclPlugEngine implements AclPl
private
Authentication
authentication
=
new
Authentication
();
ControllerParametersEntity
controllerParametersEntity
;
public
AuthenticationInfoManagementAclPlugEngine
(
ControllerParametersEntity
controllerParametersEntity
)
{
this
.
controllerParametersEntity
=
controllerParametersEntity
;
accessContralAnalysis
.
analysisClass
(
controllerParametersEntity
.
getAccessContralAnalysisClass
());
}
public
void
setAccessControl
(
AccessControl
accessControl
)
throws
AclPlugAccountAnalysisException
{
if
(
accessControl
.
getAccount
()
==
null
||
accessControl
.
getPassword
()
==
null
||
accessControl
.
getAccount
().
length
()
<=
6
||
accessControl
.
getPassword
().
length
()
<=
6
)
{
throw
new
AclPlugAccountAnalysisException
(
String
.
format
(
"The account password cannot be null and is longer than 6, account is %s password is %s"
,
accessControl
.
getAccount
(),
accessControl
.
getPassword
()));
}
try
{
NetaddressStrategy
netaddressStrategy
=
netaddressStrategyFactory
.
getNetaddressStrategy
(
accessControl
);
Map
<
String
,
AuthenticationInfo
>
accessControlAddressMap
=
accessControlMap
.
get
(
accessControl
.
getAccount
());
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
浏览文件 @
7c2b40c3
...
...
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
import
org.apache.rocketmq.acl.plug.entity.AccessControl
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationInfo
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationResult
;
import
org.apache.rocketmq.acl.plug.entity.ControllerParametersEntity
;
import
org.apache.rocketmq.acl.plug.entity.LoginInfo
;
import
org.apache.rocketmq.acl.plug.entity.LoginOrRequestAccessControl
;
...
...
@@ -29,6 +30,11 @@ public abstract class LoginInfoAclPlugEngine extends AuthenticationInfoManagemen
private
Map
<
String
,
LoginInfo
>
loginInfoMap
=
new
ConcurrentHashMap
<>();
public
LoginInfoAclPlugEngine
(
ControllerParametersEntity
controllerParametersEntity
)
{
super
(
controllerParametersEntity
);
}
public
LoginInfo
getLoginInfo
(
AccessControl
accessControl
)
{
LoginInfo
loginInfo
=
loginInfoMap
.
get
(
accessControl
.
getRecognition
());
if
(
loginInfo
==
null
)
{
...
...
@@ -51,9 +57,9 @@ public abstract class LoginInfoAclPlugEngine extends AuthenticationInfoManagemen
protected
AuthenticationInfo
getAuthenticationInfo
(
LoginOrRequestAccessControl
accessControl
,
AuthenticationResult
authenticationResult
)
{
LoginInfo
anthenticatio
nInfo
=
getLoginInfo
(
accessControl
);
if
(
anthenticationInfo
!=
null
&&
anthenticatio
nInfo
.
getAuthenticationInfo
()
!=
null
)
{
return
anthenticatio
nInfo
.
getAuthenticationInfo
();
LoginInfo
logi
nInfo
=
getLoginInfo
(
accessControl
);
if
(
loginInfo
!=
null
&&
logi
nInfo
.
getAuthenticationInfo
()
!=
null
)
{
return
logi
nInfo
.
getAuthenticationInfo
();
}
authenticationResult
.
setResultString
(
"Login information does not exist, Please check login, password, IP"
);
return
null
;
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
浏览文件 @
7c2b40c3
...
...
@@ -19,7 +19,6 @@ package org.apache.rocketmq.acl.plug.engine;
import
java.io.File
;
import
java.io.FileInputStream
;
import
java.io.IOException
;
import
org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport
;
import
org.apache.rocketmq.acl.plug.entity.ControllerParametersEntity
;
import
org.apache.rocketmq.acl.plug.exception.AclPlugAccountAnalysisException
;
...
...
@@ -29,9 +28,8 @@ public class PlainAclPlugEngine extends LoginInfoAclPlugEngine {
private
ControllerParametersEntity
controllerParametersEntity
;
public
PlainAclPlugEngine
(
ControllerParametersEntity
controllerParametersEntity
)
throws
AclPlugAccountAnalysisException
{
this
.
controllerParametersEntity
=
controllerParametersEntity
;
public
PlainAclPlugEngine
(
ControllerParametersEntity
controllerParametersEntity
)
throws
AclPlugAccountAnalysisException
{
super
(
controllerParametersEntity
);
init
();
}
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
浏览文件 @
7c2b40c3
...
...
@@ -19,7 +19,6 @@ package org.apache.rocketmq.acl.plug.entity;
import
java.util.Iterator
;
import
java.util.Map
;
import
java.util.Map.Entry
;
import
org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy
;
public
class
AuthenticationInfo
{
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
浏览文件 @
7c2b40c3
...
...
@@ -18,170 +18,117 @@ package org.apache.rocketmq.acl.plug.entity;
import
java.util.HashSet
;
import
java.util.Set
;
import
org.apache.rocketmq.acl.plug.annotation.RequestCode
;
public
class
BorkerAccessControl
extends
AccessControl
{
public
BorkerAccessControl
()
{
}
private
Set
<
String
>
permitSendTopic
=
new
HashSet
<>();
private
Set
<
String
>
noPermitSendTopic
=
new
HashSet
<>();
private
Set
<
String
>
permitPullTopic
=
new
HashSet
<>();
private
Set
<
String
>
noPermitPullTopic
=
new
HashSet
<>();
@RequestCode
(
code
=
10
)
private
boolean
sendMessage
=
true
;
@RequestCode
(
code
=
310
)
private
boolean
sendMessageV2
=
true
;
@RequestCode
(
code
=
320
)
private
boolean
sendBatchMessage
=
true
;
@RequestCode
(
code
=
36
)
private
boolean
consumerSendMsgBack
=
true
;
@RequestCode
(
code
=
11
)
private
boolean
pullMessage
=
true
;
@RequestCode
(
code
=
12
)
private
boolean
queryMessage
=
true
;
@RequestCode
(
code
=
33
)
private
boolean
viewMessageById
=
true
;
@RequestCode
(
code
=
34
)
private
boolean
heartBeat
=
true
;
@RequestCode
(
code
=
35
)
private
boolean
unregisterClient
=
true
;
@RequestCode
(
code
=
46
)
private
boolean
checkClientConfig
=
true
;
@RequestCode
(
code
=
38
)
private
boolean
getConsumerListByGroup
=
true
;
@RequestCode
(
code
=
15
)
private
boolean
updateConsumerOffset
=
true
;
@RequestCode
(
code
=
14
)
private
boolean
queryConsumerOffset
=
true
;
@RequestCode
(
code
=
37
)
private
boolean
endTransaction
=
true
;
@RequestCode
(
code
=
17
)
private
boolean
updateAndCreateTopic
=
true
;
@RequestCode
(
code
=
215
)
private
boolean
deleteTopicInbroker
=
true
;
@RequestCode
(
code
=
21
)
private
boolean
getAllTopicConfig
=
true
;
@RequestCode
(
code
=
25
)
private
boolean
updateBrokerConfig
=
true
;
@RequestCode
(
code
=
26
)
private
boolean
getBrokerConfig
=
true
;
@RequestCode
(
code
=
29
)
private
boolean
searchOffsetByTimestamp
=
true
;
@RequestCode
(
code
=
30
)
private
boolean
getMaxOffset
=
true
;
@RequestCode
(
code
=
31
)
private
boolean
getMixOffset
=
true
;
@RequestCode
(
code
=
32
)
private
boolean
getEarliestMsgStoretime
=
true
;
@RequestCode
(
code
=
28
)
private
boolean
getBrokerRuntimeInfo
=
true
;
@RequestCode
(
code
=
41
)
private
boolean
lockBatchMQ
=
true
;
@RequestCode
(
code
=
42
)
private
boolean
unlockBatchMQ
=
true
;
@RequestCode
(
code
=
200
)
private
boolean
updateAndCreteSubscriptiongroup
=
true
;
@RequestCode
(
code
=
201
)
private
boolean
getAllSubscriptiongroupConfig
=
true
;
@RequestCode
(
code
=
207
)
private
boolean
deleteSubscriptiongroup
=
true
;
@RequestCode
(
code
=
202
)
private
boolean
getTopicStatsInfo
=
true
;
@RequestCode
(
code
=
203
)
private
boolean
getConsumerConnectionList
=
true
;
@RequestCode
(
code
=
204
)
private
boolean
getProducerConnectionList
=
true
;
@RequestCode
(
code
=
208
)
private
boolean
getConsumeStats
=
true
;
@RequestCode
(
code
=
43
)
private
boolean
getAllConsumerOffset
=
true
;
@RequestCode
(
code
=
25
)
private
boolean
getAllDelayOffset
=
true
;
@RequestCode
(
code
=
222
)
private
boolean
invokeBrokerToresetOffset
=
true
;
@RequestCode
(
code
=
300
)
private
boolean
queryTopicConsumByWho
=
true
;
@RequestCode
(
code
=
301
)
private
boolean
registerFilterServer
=
true
;
@RequestCode
(
code
=
303
)
private
boolean
queryConsumeTimeSpan
=
true
;
@RequestCode
(
code
=
305
)
private
boolean
getSystemTopicListFromBroker
=
true
;
@RequestCode
(
code
=
306
)
private
boolean
cleanExpiredConsumequeue
=
true
;
@RequestCode
(
code
=
316
)
private
boolean
cleanUnusedTopic
=
true
;
@RequestCode
(
code
=
307
)
private
boolean
getConsumerRunningInfo
=
true
;
@RequestCode
(
code
=
308
)
private
boolean
queryCorrectionOffset
=
true
;
@RequestCode
(
code
=
309
)
private
boolean
consumeMessageDirectly
=
true
;
@RequestCode
(
code
=
314
)
private
boolean
cloneGroupOffset
=
true
;
@RequestCode
(
code
=
315
)
private
boolean
viewBrokerStatsData
=
true
;
@RequestCode
(
code
=
317
)
private
boolean
getBrokerConsumeStats
=
true
;
@RequestCode
(
code
=
321
)
private
boolean
queryConsumeQueue
=
true
;
public
BorkerAccessControl
()
{
}
public
Set
<
String
>
getPermitSendTopic
()
{
return
permitSendTopic
;
}
...
...
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParametersEntity.java
浏览文件 @
7c2b40c3
...
...
@@ -16,10 +16,14 @@
*/
package
org.apache.rocketmq.acl.plug.entity
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
public
class
ControllerParametersEntity
{
private
String
fileHome
;
private
Class
<?>
accessContralAnalysisClass
=
RequestCode
.
class
;
public
String
getFileHome
()
{
return
fileHome
;
}
...
...
@@ -28,11 +32,21 @@ public class ControllerParametersEntity {
this
.
fileHome
=
fileHome
;
}
@Override
public
String
toString
()
{
StringBuilder
builder
=
new
StringBuilder
();
builder
.
append
(
"ControllerParametersEntity [fileHome="
).
append
(
fileHome
).
append
(
"]"
);
return
builder
.
toString
();
}
public
Class
<?>
getAccessContralAnalysisClass
()
{
return
accessContralAnalysisClass
;
}
public
void
setAccessContralAnalysisClass
(
Class
<?>
accessContralAnalysisClass
)
{
this
.
accessContralAnalysisClass
=
accessContralAnalysisClass
;
}
@Override
public
String
toString
()
{
StringBuilder
builder
=
new
StringBuilder
();
builder
.
append
(
"ControllerParametersEntity [fileHome="
).
append
(
fileHome
).
append
(
", accessContralAnalysisClass="
)
.
append
(
accessContralAnalysisClass
).
append
(
"]"
);
return
builder
.
toString
();
}
}
acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/MultipleNetaddressStrategy.java
浏览文件 @
7c2b40c3
...
...
@@ -18,7 +18,6 @@ package org.apache.rocketmq.acl.plug.strategy;
import
java.util.HashSet
;
import
java.util.Set
;
import
org.apache.rocketmq.acl.plug.entity.AccessControl
;
public
class
MultipleNetaddressStrategy
extends
AbstractNetaddressStrategy
{
...
...
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AccessContralAnalysisTest.java
浏览文件 @
7c2b40c3
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.acl.plug
;
import
java.util.Iterator
;
import
java.util.Map
;
import
java.util.Map.Entry
;
import
org.apache.rocketmq.acl.plug.entity.AccessControl
;
import
org.apache.rocketmq.acl.plug.entity.BorkerAccessControl
;
import
org.apache.rocketmq.acl.plug.exception.AclPlugAccountAnalysisException
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
public
class
AccessContralAnalysisTest
{
AccessContralAnalysis
accessContralAnalysis
=
new
AccessContralAnalysis
();
@Before
public
void
init
()
{
accessContralAnalysis
.
analysisClass
(
RequestCode
.
class
);
}
@Test
public
void
analysisTest
()
{
AccessContralAnalysis
accessContralAnalysis
=
new
AccessContralAnalysis
();
BorkerAccessControl
accessControl
=
new
BorkerAccessControl
();
accessControl
.
setSendMessage
(
false
);
Map
<
Integer
,
Boolean
>
map
=
accessContralAnalysis
.
analysis
(
accessControl
);
...
...
@@ -27,7 +53,13 @@ public class AccessContralAnalysisTest {
}
}
Assert
.
assertEquals
(
num
,
1
);
}
@Test
(
expected
=
AclPlugAccountAnalysisException
.
class
)
public
void
analysisExceptionTest
(){
AccessControl
accessControl
=
new
AccessControl
();
accessContralAnalysis
.
analysis
(
accessControl
);
}
}
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclPlugControllerTest.java
浏览文件 @
7c2b40c3
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.acl.plug
;
public
class
AclPlugControllerTest
{
...
...
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
浏览文件 @
7c2b40c3
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.acl.plug
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.List
;
import
org.apache.commons.lang3.StringUtils
;
import
org.junit.Assert
;
import
org.junit.Test
;
...
...
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AuthenticationTest.java
浏览文件 @
7c2b40c3
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.acl.plug
;
import
java.util.HashSet
;
import
java.util.Map
;
import
java.util.Set
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationInfo
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationResult
;
import
org.apache.rocketmq.acl.plug.entity.BorkerAccessControl
;
...
...
@@ -18,11 +33,16 @@ public class AuthenticationTest {
Authentication
authentication
=
new
Authentication
();
AuthenticationInfo
authenticationInfo
;
BorkerAccessControl
borkerAccessControl
;
AuthenticationResult
authenticationResult
=
new
AuthenticationResult
();
LoginOrRequestAccessControl
loginOrRequestAccessControl
=
new
LoginOrRequestAccessControl
();
@Before
public
void
init
()
{
OneNetaddressStrategy
netaddressStrategy
=
new
OneNetaddressStrategy
(
"127.0.0.1"
);
BorkerAccessControl
borkerAccessControl
=
new
BorkerAccessControl
();
borkerAccessControl
=
new
BorkerAccessControl
();
//321
borkerAccessControl
.
setQueryConsumeQueue
(
false
);
...
...
@@ -51,8 +71,7 @@ public class AuthenticationTest {
@Test
public
void
authenticationTest
()
{
AuthenticationResult
authenticationResult
=
new
AuthenticationResult
();
LoginOrRequestAccessControl
loginOrRequestAccessControl
=
new
LoginOrRequestAccessControl
();
loginOrRequestAccessControl
.
setCode
(
317
);
boolean
isReturn
=
authentication
.
authentication
(
authenticationInfo
,
loginOrRequestAccessControl
,
authenticationResult
);
...
...
@@ -81,7 +100,7 @@ public class AuthenticationTest {
loginOrRequestAccessControl
.
setTopic
(
"nopermitSendTopic"
);
isReturn
=
authentication
.
authentication
(
authenticationInfo
,
loginOrRequestAccessControl
,
authenticationResult
);
Assert
.
assert
Tru
e
(
isReturn
);
Assert
.
assert
Fals
e
(
isReturn
);
loginOrRequestAccessControl
.
setCode
(
11
);
loginOrRequestAccessControl
.
setTopic
(
"permitPullTopic"
);
...
...
@@ -94,7 +113,29 @@ public class AuthenticationTest {
loginOrRequestAccessControl
.
setTopic
(
"nopermitPullTopic"
);
isReturn
=
authentication
.
authentication
(
authenticationInfo
,
loginOrRequestAccessControl
,
authenticationResult
);
Assert
.
assert
Tru
e
(
isReturn
);
Assert
.
assert
Fals
e
(
isReturn
);
}
@Test
public
void
isEmptyTest
()
{
loginOrRequestAccessControl
.
setCode
(
10
);
loginOrRequestAccessControl
.
setTopic
(
"absentTopic"
);
boolean
isReturn
=
authentication
.
authentication
(
authenticationInfo
,
loginOrRequestAccessControl
,
authenticationResult
);
Assert
.
assertFalse
(
isReturn
);
Set
<
String
>
permitSendTopic
=
new
HashSet
<>();
borkerAccessControl
.
setPermitSendTopic
(
permitSendTopic
);
isReturn
=
authentication
.
authentication
(
authenticationInfo
,
loginOrRequestAccessControl
,
authenticationResult
);
Assert
.
assertTrue
(
isReturn
);
loginOrRequestAccessControl
.
setCode
(
11
);
isReturn
=
authentication
.
authentication
(
authenticationInfo
,
loginOrRequestAccessControl
,
authenticationResult
);
Assert
.
assertFalse
(
isReturn
);
borkerAccessControl
.
setPermitPullTopic
(
permitSendTopic
);
isReturn
=
authentication
.
authentication
(
authenticationInfo
,
loginOrRequestAccessControl
,
authenticationResult
);
Assert
.
assertTrue
(
isReturn
);
}
}
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
浏览文件 @
7c2b40c3
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.acl.plug.engine
;
import
java.io.File
;
import
java.io.FileInputStream
;
import
java.io.FileNotFoundException
;
import
java.lang.reflect.Field
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
org.apache.rocketmq.acl.plug.entity.AccessControl
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationInfo
;
import
org.apache.rocketmq.acl.plug.entity.AuthenticationResult
;
...
...
@@ -54,13 +68,13 @@ public class PlainAclPlugEngineTest {
accessControl
=
new
BorkerAccessControl
();
accessControl
.
setAccount
(
"rokcetmq"
);
accessControl
.
setPassword
(
"aliyun"
);
accessControl
.
setPassword
(
"aliyun
11
"
);
accessControl
.
setNetaddress
(
"127.0.0.1"
);
accessControl
.
setRecognition
(
"127.0.0.1:1"
);
accessControlTwo
=
new
BorkerAccessControl
();
accessControlTwo
.
setAccount
(
"rokcet"
);
accessControlTwo
.
setPassword
(
"aliyun"
);
accessControlTwo
.
setAccount
(
"rokcet
1
"
);
accessControlTwo
.
setPassword
(
"aliyun
1
"
);
accessControlTwo
.
setNetaddress
(
"127.0.0.1"
);
accessControlTwo
.
setRecognition
(
"127.0.0.1:2"
);
...
...
@@ -69,6 +83,31 @@ public class PlainAclPlugEngineTest {
}
@Test
(
expected
=
AclPlugAccountAnalysisException
.
class
)
public
void
accountNullTest
()
{
accessControl
.
setAccount
(
null
);
plainAclPlugEngine
.
setAccessControl
(
accessControl
);
}
@Test
(
expected
=
AclPlugAccountAnalysisException
.
class
)
public
void
accountThanTest
()
{
accessControl
.
setAccount
(
"123"
);
plainAclPlugEngine
.
setAccessControl
(
accessControl
);
}
@Test
(
expected
=
AclPlugAccountAnalysisException
.
class
)
public
void
passWordtNullTest
()
{
accessControl
.
setAccount
(
null
);
plainAclPlugEngine
.
setAccessControl
(
accessControl
);
}
@Test
(
expected
=
AclPlugAccountAnalysisException
.
class
)
public
void
passWordThanTest
()
{
accessControl
.
setAccount
(
"123"
);
plainAclPlugEngine
.
setAccessControl
(
accessControl
);
}
@Test
(
expected
=
AclPlugAccountAnalysisException
.
class
)
public
void
testPlainAclPlugEngineInit
()
{
ControllerParametersEntity
controllerParametersEntity
=
new
ControllerParametersEntity
();
...
...
@@ -88,7 +127,7 @@ public class PlainAclPlugEngineTest {
AccessControl
testAccessControl
=
new
AccessControl
();
testAccessControl
.
setAccount
(
"rokcetmq"
);
testAccessControl
.
setPassword
(
"aliyun"
);
testAccessControl
.
setPassword
(
"aliyun
11
"
);
testAccessControl
.
setNetaddress
(
"127.0.0.1"
);
testAccessControl
.
setRecognition
(
"127.0.0.1:1"
);
...
...
@@ -97,7 +136,7 @@ public class PlainAclPlugEngineTest {
Assert
.
assertNull
(
authenticationInfo
);
testAccessControl
.
setAccount
(
"rokcetmq"
);
testAccessControl
.
setPassword
(
"1"
);
testAccessControl
.
setPassword
(
"1
234567
"
);
authenticationInfo
=
aclPlugEngine
.
getAccessControl
(
testAccessControl
);
Assert
.
assertNull
(
authenticationInfo
);
...
...
@@ -128,6 +167,8 @@ public class PlainAclPlugEngineTest {
public
void
setNetaddressAccessControl
()
{
AuthenticationInfoManagementAclPlugEngine
aclPlugEngine
=
(
AuthenticationInfoManagementAclPlugEngine
)
plainAclPlugEngine
;
AccessControl
accessControl
=
new
BorkerAccessControl
();
accessControl
.
setAccount
(
"RocketMQ"
);
accessControl
.
setPassword
(
"RocketMQ"
);
accessControl
.
setNetaddress
(
"127.0.0.1"
);
aclPlugEngine
.
setAccessControl
(
accessControl
);
aclPlugEngine
.
setNetaddressAccessControl
(
accessControl
);
...
...
@@ -162,6 +203,8 @@ public class PlainAclPlugEngineTest {
AuthenticationInfoManagementAclPlugEngine
aclPlugEngine
=
(
AuthenticationInfoManagementAclPlugEngine
)
plainAclPlugEngine
;
AccessControl
accessControl
=
new
BorkerAccessControl
();
accessControl
.
setAccount
(
"RocketMQ"
);
accessControl
.
setPassword
(
"RocketMQ"
);
accessControl
.
setNetaddress
(
"127.0.0.1"
);
aclPlugEngine
.
setAccessControl
(
accessControl
);
AuthenticationInfo
authenticationInfo
=
aclPlugEngine
.
getAccessControl
(
accessControl
);
...
...
@@ -201,7 +244,7 @@ public class PlainAclPlugEngineTest {
public
void
getAuthenticationInfo
()
{
LoginOrRequestAccessControl
loginOrRequestAccessControl
=
new
LoginOrRequestAccessControl
();
loginOrRequestAccessControl
.
setAccount
(
"rokcetmq"
);
loginOrRequestAccessControl
.
setPassword
(
"aliyun"
);
loginOrRequestAccessControl
.
setPassword
(
"aliyun
11
"
);
loginOrRequestAccessControl
.
setNetaddress
(
"127.0.0.1"
);
loginOrRequestAccessControl
.
setRecognition
(
"127.0.0.1:1"
);
...
...
acl-plug/src/test/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyTest.java
浏览文件 @
7c2b40c3
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.acl.plug.strategy
;
import
org.apache.rocketmq.acl.plug.entity.AccessControl
;
...
...
acl-plug/src/test/resources/conf/transport.yml
浏览文件 @
7c2b40c3
...
...
@@ -4,14 +4,14 @@ onlyNetAddress:
-
broker-a
list
:
-
account
:
laohu
password
:
123456
-
account
:
rocketMQ
password
:
123456
7
netaddress
:
192.0.0.*
permitSendTopic
:
-
test1
-
test2
-
account
:
laohu
password
:
123456
-
account
:
rocketMQ
password
:
123456
7
netaddress
:
192.0.2.1
permitSendTopic
:
-
test3
...
...
broker/pom.xml
浏览文件 @
7c2b40c3
...
...
@@ -9,81 +9,81 @@
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project
xmlns
=
"http://maven.apache.org/POM/4.0.0
"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance
"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<groupId>
org.apache.rocketmq
</groupId>
<artifactId>
rocketmq-all
</artifactId>
<version>
4.4.0-SNAPSHOT
</version>
</parent>
<project
xmlns
:xsi=
"http://www.w3.org/2001/XMLSchema-instance
"
xmlns=
"http://maven.apache.org/POM/4.0.0
"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<groupId>
org.apache.rocketmq
</groupId>
<artifactId>
rocketmq-all
</artifactId>
<version>
4.4.0-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<packaging>
jar
</packaging>
<artifactId>
rocketmq-broker
</artifactId>
<name>
rocketmq-broker ${project.version}
</name>
<modelVersion>
4.0.0
</modelVersion>
<packaging>
jar
</packaging>
<artifactId>
rocketmq-broker
</artifactId>
<name>
rocketmq-broker ${project.version}
</name>
<dependencies>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-common
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-store
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-remoting
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-client
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-srvutil
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-filter
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-acl-plug
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-core
</artifactId>
</dependency>
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
fastjson
</artifactId>
</dependency>
<dependency>
<groupId>
org.javassist
</groupId>
<artifactId>
javassist
</artifactId>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-common
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-store
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-remoting
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-client
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-srvutil
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-filter
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-acl-plug
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-core
</artifactId>
</dependency>
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
fastjson
</artifactId>
</dependency>
<dependency>
<groupId>
org.javassist
</groupId>
<artifactId>
javassist
</artifactId>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>
maven-surefire-plugin
</artifactId>
<version>
2.19.1
</version>
<configuration>
<forkCount>
1
</forkCount>
<reuseForks>
false
</reuseForks>
</configuration>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<artifactId>
maven-surefire-plugin
</artifactId>
<version>
2.19.1
</version>
<configuration>
<forkCount>
1
</forkCount>
<reuseForks>
false
</reuseForks>
</configuration>
</plugin>
</plugins>
</build>
</project>
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
7c2b40c3
...
...
@@ -302,7 +302,6 @@ public class BrokerController {
this
.
heartbeatThreadPoolQueue
,
new
ThreadFactoryImpl
(
"HeartbeatThread_"
,
true
));
this
.
endTransactionExecutor
=
new
BrokerFixedThreadPoolExecutor
(
this
.
brokerConfig
.
getEndTransactionThreadPoolNums
(),
this
.
brokerConfig
.
getEndTransactionThreadPoolNums
(),
...
...
@@ -1101,12 +1100,12 @@ public class BrokerController {
this
.
transactionalMessageCheckListener
=
transactionalMessageCheckListener
;
}
<<<<<<<
HEAD
public
AclPlugController
getAclPlugController
()
{
return
this
.
aclPlugController
;
=======
}
public
BlockingQueue
<
Runnable
>
getEndTransactionThreadPoolQueue
()
{
return
endTransactionThreadPoolQueue
;
>>>>>>>
53
a63460d3a1599a6c51058bb51a73746233022d
}
}
broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
浏览文件 @
7c2b40c3
...
...
@@ -72,7 +72,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
this
.
brokerController
.
getProducerManager
().
doChannelCloseEvent
(
remoteAddr
,
channel
);
this
.
brokerController
.
getConsumerManager
().
doChannelCloseEvent
(
remoteAddr
,
channel
);
this
.
brokerController
.
getFilterServerManager
().
doChannelCloseEvent
(
remoteAddr
,
channel
);
this
.
brokerController
.
getAclPlugController
().
doChannelCloseEvent
(
remoteAddr
);
if
(
this
.
brokerController
.
getAclPlugController
()
!=
null
&&
this
.
brokerController
.
getAclPlugController
().
isStartSucceed
())
{
this
.
brokerController
.
getAclPlugController
().
doChannelCloseEvent
(
remoteAddr
);
}
}
@Override
...
...
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
浏览文件 @
7c2b40c3
...
...
@@ -70,7 +70,6 @@ public class BrokerConfig {
*/
private
int
endTransactionThreadPoolNums
=
8
+
Runtime
.
getRuntime
().
availableProcessors
()
*
2
;
private
int
flushConsumerOffsetInterval
=
1000
*
5
;
private
int
flushConsumerOffsetHistoryInterval
=
1000
*
60
;
...
...
@@ -174,6 +173,16 @@ public class BrokerConfig {
private
boolean
isAclPlug
;
public
static
String
localHostName
()
{
try
{
return
InetAddress
.
getLocalHost
().
getHostName
();
}
catch
(
UnknownHostException
e
)
{
log
.
error
(
"Failed to obtain the host name"
,
e
);
}
return
"DEFAULT_BROKER"
;
}
public
boolean
isTraceOn
()
{
return
traceOn
;
}
...
...
@@ -238,16 +247,6 @@ public class BrokerConfig {
this
.
slaveReadEnable
=
slaveReadEnable
;
}
public
static
String
localHostName
()
{
try
{
return
InetAddress
.
getLocalHost
().
getHostName
();
}
catch
(
UnknownHostException
e
)
{
log
.
error
(
"Failed to obtain the host name"
,
e
);
}
return
"DEFAULT_BROKER"
;
}
public
int
getRegisterBrokerTimeoutMills
()
{
return
registerBrokerTimeoutMills
;
}
...
...
@@ -712,7 +711,6 @@ public class BrokerConfig {
this
.
transactionCheckInterval
=
transactionCheckInterval
;
}
public
boolean
isAclPlug
()
{
return
isAclPlug
;
}
...
...
distribution/conf/transport.yml
浏览文件 @
7c2b40c3
...
...
@@ -4,14 +4,14 @@ onlyNetAddress:
-
broker-a
list
:
-
account
:
laohu
password
:
123456
-
account
:
RocketMQ
password
:
123456
7
netaddress
:
192.0.0.*
permitSendTopic
:
-
test1
-
test2
-
account
:
laohu
password
:
123456
-
account
:
RocketMQ
password
:
123456
7
netaddress
:
192.0.2.1
permitSendTopic
:
-
test3
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录