Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
8d964f45
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
8d964f45
编写于
3月 16, 2020
作者:
C
coder-zzzz
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
close #1848
上级
d1b4e47c
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
365 addition
and
21 deletion
+365
-21
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
...java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+24
-0
client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
.../org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+27
-1
common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
...java/org/apache/rocketmq/common/protocol/RequestCode.java
+2
-0
common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerRequestHeader.java
...col/header/namesrv/AddWritePermOfBrokerRequestHeader.java
+39
-0
common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerResponseHeader.java
...ol/header/namesrv/AddWritePermOfBrokerResponseHeader.java
+38
-0
namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
...e/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+22
-0
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
...g/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+26
-13
namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
...ache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
+46
-6
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
...va/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+5
-0
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
...rg/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+6
-0
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
...main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+3
-0
tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
...ava/org/apache/rocketmq/tools/command/MQAdminStartup.java
+2
-0
tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java
...ocketmq/tools/command/namesrv/AddWritePermSubCommand.java
+80
-0
tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
...cketmq/tools/command/namesrv/WipeWritePermSubCommand.java
+1
-1
tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
...rg/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+7
-0
tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java
...tmq/tools/command/namesrv/AddWritePermSubCommandTest.java
+37
-0
未找到文件。
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
浏览文件 @
8d964f45
...
...
@@ -135,6 +135,8 @@ import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRe
import
org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader
;
...
...
@@ -1432,6 +1434,28 @@ public class MQClientAPIImpl {
throw
new
MQClientException
(
response
.
getCode
(),
response
.
getRemark
());
}
public
int
addWritePermOfBroker
(
final
String
nameSrvAddr
,
String
brokerName
,
final
long
timeoutMillis
)
throws
RemotingCommandException
,
RemotingConnectException
,
RemotingSendRequestException
,
RemotingTimeoutException
,
InterruptedException
,
MQClientException
{
AddWritePermOfBrokerRequestHeader
requestHeader
=
new
AddWritePermOfBrokerRequestHeader
();
requestHeader
.
setBrokerName
(
brokerName
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
ADD_WRITE_PERM_OF_BROKER
,
requestHeader
);
RemotingCommand
response
=
this
.
remotingClient
.
invokeSync
(
nameSrvAddr
,
request
,
timeoutMillis
);
assert
response
!=
null
;
switch
(
response
.
getCode
())
{
case
ResponseCode
.
SUCCESS
:
{
AddWritePermOfBrokerResponseHeader
responseHeader
=
(
AddWritePermOfBrokerResponseHeader
)
response
.
decodeCommandCustomHeader
(
AddWritePermOfBrokerResponseHeader
.
class
);
return
responseHeader
.
getAddTopicCount
();
}
default
:
break
;
}
throw
new
MQClientException
(
response
.
getCode
(),
response
.
getRemark
());
}
public
void
deleteTopicInBroker
(
final
String
addr
,
final
String
topic
,
final
long
timeoutMillis
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
,
MQClientException
{
DeleteTopicRequestHeader
requestHeader
=
new
DeleteTopicRequestHeader
();
...
...
client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
浏览文件 @
8d964f45
...
...
@@ -16,7 +16,6 @@
*/
package
org.apache.rocketmq.client.impl
;
import
java.lang.reflect.Field
;
import
org.apache.rocketmq.client.ClientConfig
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
...
...
@@ -29,9 +28,11 @@ import org.apache.rocketmq.client.producer.SendStatus;
import
org.apache.rocketmq.common.PlainAccessConfig
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader
;
import
org.apache.rocketmq.remoting.InvokeCallback
;
import
org.apache.rocketmq.remoting.RemotingClient
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
...
...
@@ -48,6 +49,8 @@ import org.mockito.invocation.InvocationOnMock;
import
org.mockito.junit.MockitoJUnitRunner
;
import
org.mockito.stubbing.Answer
;
import
java.lang.reflect.Field
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
assertj
.
core
.
api
.
Fail
.
failBecauseExceptionWasNotThrown
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
...
...
@@ -442,4 +445,27 @@ public class MQClientAPIImplTest {
requestHeader
.
setMaxReconsumeTimes
(
10
);
return
requestHeader
;
}
@Test
public
void
testAddWritePermOfBroker
()
throws
Exception
{
doAnswer
(
new
Answer
()
{
@Override
public
Object
answer
(
InvocationOnMock
invocationOnMock
)
throws
Throwable
{
RemotingCommand
request
=
invocationOnMock
.
getArgument
(
1
);
if
(
request
.
getCode
()
!=
RequestCode
.
ADD_WRITE_PERM_OF_BROKER
)
{
return
null
;
}
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
AddWritePermOfBrokerResponseHeader
.
class
);
AddWritePermOfBrokerResponseHeader
responseHeader
=
(
AddWritePermOfBrokerResponseHeader
)
response
.
readCustomHeader
();
response
.
setCode
(
ResponseCode
.
SUCCESS
);
responseHeader
.
setAddTopicCount
(
7
);
response
.
addExtField
(
"addTopicCount"
,
String
.
valueOf
(
responseHeader
.
getAddTopicCount
()));
return
response
;
}
}).
when
(
remotingClient
).
invokeSync
(
anyString
(),
any
(
RemotingCommand
.
class
),
anyLong
());
int
topicCnt
=
mqClientAPI
.
addWritePermOfBroker
(
"127.0.0.1"
,
"default-broker"
,
1000
);
assertThat
(
topicCnt
).
isEqualTo
(
7
);
}
}
\ No newline at end of file
common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
浏览文件 @
8d964f45
...
...
@@ -188,4 +188,6 @@ public class RequestCode {
public
static
final
int
SEND_REPLY_MESSAGE_V2
=
325
;
public
static
final
int
PUSH_REPLY_MESSAGE_TO_CLIENT
=
326
;
public
static
final
int
ADD_WRITE_PERM_OF_BROKER
=
327
;
}
common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerRequestHeader.java
0 → 100644
浏览文件 @
8d964f45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.protocol.header.namesrv
;
import
org.apache.rocketmq.remoting.CommandCustomHeader
;
import
org.apache.rocketmq.remoting.annotation.CFNotNull
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
public
class
AddWritePermOfBrokerRequestHeader
implements
CommandCustomHeader
{
@CFNotNull
private
String
brokerName
;
@Override
public
void
checkFields
()
throws
RemotingCommandException
{
}
public
String
getBrokerName
()
{
return
brokerName
;
}
public
void
setBrokerName
(
String
brokerName
)
{
this
.
brokerName
=
brokerName
;
}
}
common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerResponseHeader.java
0 → 100644
浏览文件 @
8d964f45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.common.protocol.header.namesrv
;
import
org.apache.rocketmq.remoting.CommandCustomHeader
;
import
org.apache.rocketmq.remoting.annotation.CFNotNull
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
public
class
AddWritePermOfBrokerResponseHeader
implements
CommandCustomHeader
{
@CFNotNull
private
Integer
addTopicCount
;
@Override
public
void
checkFields
()
throws
RemotingCommandException
{
}
public
Integer
getAddTopicCount
()
{
return
addTopicCount
;
}
public
void
setAddTopicCount
(
Integer
addTopicCount
)
{
this
.
addTopicCount
=
addTopicCount
;
}
}
namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
浏览文件 @
8d964f45
...
...
@@ -27,6 +27,8 @@ import org.apache.rocketmq.common.MixAll;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.help.FAQUrl
;
import
org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.common.namesrv.NamesrvUtil
;
...
...
@@ -102,6 +104,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return
this
.
getBrokerClusterInfo
(
ctx
,
request
);
case
RequestCode
.
WIPE_WRITE_PERM_OF_BROKER
:
return
this
.
wipeWritePermOfBroker
(
ctx
,
request
);
case
RequestCode
.
ADD_WRITE_PERM_OF_BROKER
:
return
this
.
addWritePermOfBroker
(
ctx
,
request
);
case
RequestCode
.
GET_ALL_TOPIC_LIST_FROM_NAMESERVER
:
return
getAllTopicListFromNameserver
(
ctx
,
request
);
case
RequestCode
.
DELETE_TOPIC_IN_NAMESRV
:
...
...
@@ -394,6 +398,24 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return
response
;
}
private
RemotingCommand
addWritePermOfBroker
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
AddWritePermOfBrokerResponseHeader
.
class
);
final
AddWritePermOfBrokerResponseHeader
responseHeader
=
(
AddWritePermOfBrokerResponseHeader
)
response
.
readCustomHeader
();
final
AddWritePermOfBrokerRequestHeader
requestHeader
=
(
AddWritePermOfBrokerRequestHeader
)
request
.
decodeCommandCustomHeader
(
AddWritePermOfBrokerRequestHeader
.
class
);
int
addTopicCnt
=
this
.
namesrvController
.
getRouteInfoManager
().
addWritePermOfBrokerByLock
(
requestHeader
.
getBrokerName
());
log
.
info
(
"add write perm of broker[{}], client: {}, {}"
,
requestHeader
.
getBrokerName
(),
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
()),
addTopicCnt
);
responseHeader
.
setAddTopicCount
(
addTopicCnt
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
return
response
;
}
private
RemotingCommand
getAllTopicListFromNameserver
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
...
...
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
浏览文件 @
8d964f45
...
...
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MixAll;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.constant.PermName
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.common.namesrv.RegisterBrokerResult
;
...
...
@@ -252,40 +253,52 @@ public class RouteInfoManager {
}
public
int
wipeWritePermOfBrokerByLock
(
final
String
brokerName
)
{
return
operateWritePermOfBrokerByLock
(
brokerName
,
RequestCode
.
WIPE_WRITE_PERM_OF_BROKER
);
}
public
int
addWritePermOfBrokerByLock
(
final
String
brokerName
)
{
return
operateWritePermOfBrokerByLock
(
brokerName
,
RequestCode
.
ADD_WRITE_PERM_OF_BROKER
);
}
private
int
operateWritePermOfBrokerByLock
(
final
String
brokerName
,
final
int
requestCode
)
{
try
{
try
{
this
.
lock
.
writeLock
().
lockInterruptibly
();
return
wipeWritePermOfBroker
(
brokerNam
e
);
return
operateWritePermOfBroker
(
brokerName
,
requestCod
e
);
}
finally
{
this
.
lock
.
writeLock
().
unlock
();
}
}
catch
(
Exception
e
)
{
log
.
error
(
"
wip
eWritePermOfBrokerByLock Exception"
,
e
);
log
.
error
(
"
operat
eWritePermOfBrokerByLock Exception"
,
e
);
}
return
0
;
}
private
int
wipeWritePermOfBroker
(
final
String
brokerName
)
{
int
wipeTopicCnt
=
0
;
Iterator
<
Entry
<
String
,
List
<
QueueData
>>>
itTopic
=
this
.
topicQueueTable
.
entrySet
().
iterator
();
while
(
itTopic
.
hasNext
())
{
Entry
<
String
,
List
<
QueueData
>>
entry
=
itTopic
.
next
();
private
int
operateWritePermOfBroker
(
final
String
brokerName
,
final
int
requestCode
)
{
int
topicCnt
=
0
;
for
(
Entry
<
String
,
List
<
QueueData
>>
entry
:
this
.
topicQueueTable
.
entrySet
())
{
List
<
QueueData
>
qdList
=
entry
.
getValue
();
Iterator
<
QueueData
>
it
=
qdList
.
iterator
();
while
(
it
.
hasNext
())
{
QueueData
qd
=
it
.
next
();
for
(
QueueData
qd
:
qdList
)
{
if
(
qd
.
getBrokerName
().
equals
(
brokerName
))
{
int
perm
=
qd
.
getPerm
();
perm
&=
~
PermName
.
PERM_WRITE
;
switch
(
requestCode
)
{
case
RequestCode
.
WIPE_WRITE_PERM_OF_BROKER
:
perm
&=
~
PermName
.
PERM_WRITE
;
break
;
case
RequestCode
.
ADD_WRITE_PERM_OF_BROKER
:
perm
=
PermName
.
PERM_READ
|
PermName
.
PERM_WRITE
;
break
;
}
qd
.
setPerm
(
perm
);
wipeT
opicCnt
++;
t
opicCnt
++;
}
}
}
return
wipeT
opicCnt
;
return
t
opicCnt
;
}
public
void
unregisterBroker
(
...
...
namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
浏览文件 @
8d964f45
...
...
@@ -17,17 +17,23 @@
package
org.apache.rocketmq.namesrv.routeinfo
;
import
io.netty.channel.Channel
;
import
java.util.ArrayList
;
import
java.util.concurrent.ConcurrentHashMap
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.constant.PermName
;
import
org.apache.rocketmq.common.namesrv.RegisterBrokerResult
;
import
org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper
;
import
org.apache.rocketmq.common.protocol.route.QueueData
;
import
org.apache.rocketmq.common.protocol.route.TopicRouteData
;
import
org.junit.After
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
import
java.lang.reflect.Field
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.concurrent.ConcurrentHashMap
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
Mockito
.
mock
;
...
...
@@ -74,14 +80,28 @@ public class RouteInfoManagerTest {
topicConfigSerializeWrapper
.
setTopicConfigTable
(
topicConfigConcurrentHashMap
);
Channel
channel
=
mock
(
Channel
.
class
);
RegisterBrokerResult
registerBrokerResult
=
routeInfoManager
.
registerBroker
(
"default-cluster"
,
"127.0.0.1:10911"
,
"default-broker"
,
1234
,
"127.0.0.1:1001"
,
topicConfigSerializeWrapper
,
new
ArrayList
<
String
>(),
channel
);
topicConfigSerializeWrapper
,
new
ArrayList
<
String
>(),
channel
);
assertThat
(
registerBrokerResult
).
isNotNull
();
}
@Test
public
void
testWipeWritePermOfBrokerByLock
()
{
int
result
=
routeInfoManager
.
wipeWritePermOfBrokerByLock
(
"default-broker"
);
assertThat
(
result
).
isEqualTo
(
0
);
public
void
testWipeWritePermOfBrokerByLock
()
throws
Exception
{
List
<
QueueData
>
qdList
=
new
ArrayList
<>();
QueueData
qd
=
new
QueueData
();
qd
.
setPerm
(
PermName
.
PERM_READ
|
PermName
.
PERM_WRITE
);
qd
.
setBrokerName
(
"broker-a"
);
qdList
.
add
(
qd
);
HashMap
<
String
,
List
<
QueueData
>>
topicQueueTable
=
new
HashMap
<>();
topicQueueTable
.
put
(
"topic-a"
,
qdList
);
Field
filed
=
RouteInfoManager
.
class
.
getDeclaredField
(
"topicQueueTable"
);
filed
.
setAccessible
(
true
);
filed
.
set
(
routeInfoManager
,
topicQueueTable
);
int
addTopicCnt
=
routeInfoManager
.
wipeWritePermOfBrokerByLock
(
"broker-a"
);
assertThat
(
addTopicCnt
).
isEqualTo
(
1
);
assertThat
(
qd
.
getPerm
()).
isEqualTo
(
PermName
.
PERM_READ
);
}
@Test
...
...
@@ -119,4 +139,24 @@ public class RouteInfoManagerTest {
byte
[]
topicList
=
routeInfoManager
.
getHasUnitSubUnUnitTopicList
();
assertThat
(
topicList
).
isNotNull
();
}
@Test
public
void
testAddWritePermOfBrokerByLock
()
throws
Exception
{
List
<
QueueData
>
qdList
=
new
ArrayList
<>();
QueueData
qd
=
new
QueueData
();
qd
.
setPerm
(
PermName
.
PERM_READ
);
qd
.
setBrokerName
(
"broker-a"
);
qdList
.
add
(
qd
);
HashMap
<
String
,
List
<
QueueData
>>
topicQueueTable
=
new
HashMap
<>();
topicQueueTable
.
put
(
"topic-a"
,
qdList
);
Field
filed
=
RouteInfoManager
.
class
.
getDeclaredField
(
"topicQueueTable"
);
filed
.
setAccessible
(
true
);
filed
.
set
(
routeInfoManager
,
topicQueueTable
);
int
addTopicCnt
=
routeInfoManager
.
addWritePermOfBrokerByLock
(
"broker-a"
);
assertThat
(
addTopicCnt
).
isEqualTo
(
1
);
assertThat
(
qd
.
getPerm
()).
isEqualTo
(
PermName
.
PERM_READ
|
PermName
.
PERM_WRITE
);
}
}
\ No newline at end of file
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
浏览文件 @
8d964f45
...
...
@@ -282,6 +282,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return
defaultMQAdminExtImpl
.
wipeWritePermOfBroker
(
namesrvAddr
,
brokerName
);
}
@Override
public
int
addWritePermOfBroker
(
String
namesrvAddr
,
String
brokerName
)
throws
RemotingCommandException
,
RemotingConnectException
,
RemotingSendRequestException
,
RemotingTimeoutException
,
InterruptedException
,
MQClientException
{
return
defaultMQAdminExtImpl
.
addWritePermOfBroker
(
namesrvAddr
,
brokerName
);
}
@Override
public
void
putKVConfig
(
String
namespace
,
String
key
,
String
value
)
{
defaultMQAdminExtImpl
.
putKVConfig
(
namespace
,
key
,
value
);
...
...
tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
浏览文件 @
8d964f45
...
...
@@ -382,6 +382,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return
this
.
mqClientInstance
.
getMQClientAPIImpl
().
wipeWritePermOfBroker
(
namesrvAddr
,
brokerName
,
timeoutMillis
);
}
@Override
public
int
addWritePermOfBroker
(
String
namesrvAddr
,
String
brokerName
)
throws
RemotingCommandException
,
RemotingConnectException
,
RemotingSendRequestException
,
RemotingTimeoutException
,
InterruptedException
,
MQClientException
{
return
this
.
mqClientInstance
.
getMQClientAPIImpl
().
addWritePermOfBroker
(
namesrvAddr
,
brokerName
,
timeoutMillis
);
}
@Override
public
void
putKVConfig
(
String
namespace
,
String
key
,
String
value
)
{
}
...
...
tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
浏览文件 @
8d964f45
...
...
@@ -134,6 +134,9 @@ public interface MQAdminExt extends MQAdmin {
int
wipeWritePermOfBroker
(
final
String
namesrvAddr
,
String
brokerName
)
throws
RemotingCommandException
,
RemotingConnectException
,
RemotingSendRequestException
,
RemotingTimeoutException
,
InterruptedException
,
MQClientException
;
int
addWritePermOfBroker
(
final
String
namesrvAddr
,
String
brokerName
)
throws
RemotingCommandException
,
RemotingConnectException
,
RemotingSendRequestException
,
RemotingTimeoutException
,
InterruptedException
,
MQClientException
;
void
putKVConfig
(
final
String
namespace
,
final
String
key
,
final
String
value
);
String
getKVConfig
(
final
String
namespace
,
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
浏览文件 @
8d964f45
...
...
@@ -60,6 +60,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand;
import
org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand
;
import
org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand
;
import
org.apache.rocketmq.tools.command.message.SendMessageCommand
;
import
org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand
;
import
org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand
;
import
org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand
;
import
org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand
;
...
...
@@ -183,6 +184,7 @@ public class MQAdminStartup {
initCommand
(
new
DeleteKvConfigCommand
());
initCommand
(
new
WipeWritePermSubCommand
());
initCommand
(
new
AddWritePermSubCommand
());
initCommand
(
new
ResetOffsetByTimeCommand
());
initCommand
(
new
UpdateOrderConfCommand
());
...
...
tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java
0 → 100644
浏览文件 @
8d964f45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.tools.command.namesrv
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.Option
;
import
org.apache.commons.cli.Options
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExt
;
import
org.apache.rocketmq.tools.command.SubCommand
;
import
org.apache.rocketmq.tools.command.SubCommandException
;
import
java.util.List
;
public
class
AddWritePermSubCommand
implements
SubCommand
{
@Override
public
String
commandName
()
{
return
"addWritePerm"
;
}
@Override
public
String
commandDesc
()
{
return
"Add write perm of broker in all name server you defined in the -n param"
;
}
@Override
public
Options
buildCommandlineOptions
(
Options
options
)
{
Option
opt
=
new
Option
(
"b"
,
"brokerName"
,
true
,
"broker name"
);
opt
.
setRequired
(
true
);
options
.
addOption
(
opt
);
return
options
;
}
@Override
public
void
execute
(
CommandLine
commandLine
,
Options
options
,
RPCHook
rpcHook
)
throws
SubCommandException
{
DefaultMQAdminExt
defaultMQAdminExt
=
new
DefaultMQAdminExt
(
rpcHook
);
defaultMQAdminExt
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
try
{
defaultMQAdminExt
.
start
();
String
brokerName
=
commandLine
.
getOptionValue
(
'b'
).
trim
();
List
<
String
>
namesrvList
=
defaultMQAdminExt
.
getNameServerAddressList
();
if
(
namesrvList
!=
null
)
{
for
(
String
namesrvAddr
:
namesrvList
)
{
try
{
int
addTopicCount
=
defaultMQAdminExt
.
addWritePermOfBroker
(
namesrvAddr
,
brokerName
);
System
.
out
.
printf
(
"add write perm of broker[%s] in name server[%s] OK, %d%n"
,
brokerName
,
namesrvAddr
,
addTopicCount
);
}
catch
(
Exception
e
)
{
System
.
out
.
printf
(
"add write perm of broker[%s] in name server[%s] Failed%n"
,
brokerName
,
namesrvAddr
);
e
.
printStackTrace
();
}
}
}
}
catch
(
Exception
e
)
{
throw
new
SubCommandException
(
this
.
getClass
().
getSimpleName
()
+
"command failed"
,
e
);
}
finally
{
defaultMQAdminExt
.
shutdown
();
}
}
}
tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
浏览文件 @
8d964f45
...
...
@@ -34,7 +34,7 @@ public class WipeWritePermSubCommand implements SubCommand {
@Override
public
String
commandDesc
()
{
return
"Wipe write perm of broker in all name server"
;
return
"Wipe write perm of broker in all name server
you defined in the -n param
"
;
}
@Override
...
...
tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
浏览文件 @
8d964f45
...
...
@@ -209,6 +209,7 @@ public class DefaultMQAdminExtTest {
when
(
mQClientAPIImpl
.
getProducerConnectionList
(
anyString
(),
anyString
(),
anyLong
())).
thenReturn
(
producerConnection
);
when
(
mQClientAPIImpl
.
wipeWritePermOfBroker
(
anyString
(),
anyString
(),
anyLong
())).
thenReturn
(
6
);
when
(
mQClientAPIImpl
.
addWritePermOfBroker
(
anyString
(),
anyString
(),
anyLong
())).
thenReturn
(
7
);
TopicStatsTable
topicStatsTable
=
new
TopicStatsTable
();
topicStatsTable
.
setOffsetTable
(
new
HashMap
<
MessageQueue
,
TopicOffset
>());
...
...
@@ -299,6 +300,12 @@ public class DefaultMQAdminExtTest {
assertThat
(
result
).
isEqualTo
(
6
);
}
@Test
public
void
testAddWritePermOfBroker
()
throws
InterruptedException
,
RemotingCommandException
,
RemotingSendRequestException
,
RemotingTimeoutException
,
MQClientException
,
RemotingConnectException
{
int
result
=
defaultMQAdminExt
.
addWritePermOfBroker
(
"127.0.0.1:9876"
,
"default-broker"
);
assertThat
(
result
).
isEqualTo
(
7
);
}
@Test
public
void
testExamineTopicRouteInfo
()
throws
RemotingException
,
MQClientException
,
InterruptedException
{
TopicRouteData
topicRouteData
=
defaultMQAdminExt
.
examineTopicRouteInfo
(
"UnitTest"
);
...
...
tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java
0 → 100644
浏览文件 @
8d964f45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.tools.command.namesrv
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.cli.PosixParser
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
import
org.apache.rocketmq.tools.command.SubCommandException
;
import
org.junit.Test
;
public
class
AddWritePermSubCommandTest
{
@Test
public
void
testExecute
()
throws
SubCommandException
{
AddWritePermSubCommand
cmd
=
new
AddWritePermSubCommand
();
Options
options
=
ServerUtil
.
buildCommandlineOptions
(
new
Options
());
String
[]
subargs
=
new
String
[]{
"-b default-broker"
};
final
CommandLine
commandLine
=
ServerUtil
.
parseCmdLine
(
"mqadmin "
+
cmd
.
commandName
(),
subargs
,
cmd
.
buildCommandlineOptions
(
options
),
new
PosixParser
());
cmd
.
execute
(
commandLine
,
options
,
null
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录