未验证 提交 f9a0f530 编写于 作者: T tiger lee 提交者: GitHub

Merge pull request #1858 from coder-zzzz/github_features/#1848

[ISSUE #1848] Add write perm admin command 
...@@ -135,6 +135,8 @@ import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRe ...@@ -135,6 +135,8 @@ import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRe
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
...@@ -1428,6 +1430,28 @@ public class MQClientAPIImpl { ...@@ -1428,6 +1430,28 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark()); throw new MQClientException(response.getCode(), response.getRemark());
} }
public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, final long timeoutMillis)
throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
AddWritePermOfBrokerRequestHeader requestHeader = new AddWritePermOfBrokerRequestHeader();
requestHeader.setBrokerName(brokerName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ADD_WRITE_PERM_OF_BROKER, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(nameSrvAddr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
AddWritePermOfBrokerResponseHeader responseHeader =
(AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class);
return responseHeader.getAddTopicCount();
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis) public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.client.impl; package org.apache.rocketmq.client.impl;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -29,9 +28,11 @@ import org.apache.rocketmq.client.producer.SendStatus; ...@@ -29,9 +28,11 @@ import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
...@@ -48,6 +49,8 @@ import org.mockito.invocation.InvocationOnMock; ...@@ -48,6 +49,8 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.lang.reflect.Field;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
...@@ -442,4 +445,27 @@ public class MQClientAPIImplTest { ...@@ -442,4 +445,27 @@ public class MQClientAPIImplTest {
requestHeader.setMaxReconsumeTimes(10); requestHeader.setMaxReconsumeTimes(10);
return requestHeader; return requestHeader;
} }
@Test
public void testAddWritePermOfBroker() throws Exception {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
RemotingCommand request = invocationOnMock.getArgument(1);
if (request.getCode() != RequestCode.ADD_WRITE_PERM_OF_BROKER) {
return null;
}
RemotingCommand response = RemotingCommand.createResponseCommand(AddWritePermOfBrokerResponseHeader.class);
AddWritePermOfBrokerResponseHeader responseHeader = (AddWritePermOfBrokerResponseHeader) response.readCustomHeader();
response.setCode(ResponseCode.SUCCESS);
responseHeader.setAddTopicCount(7);
response.addExtField("addTopicCount", String.valueOf(responseHeader.getAddTopicCount()));
return response;
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
int topicCnt = mqClientAPI.addWritePermOfBroker("127.0.0.1", "default-broker", 1000);
assertThat(topicCnt).isEqualTo(7);
}
} }
\ No newline at end of file
...@@ -188,4 +188,6 @@ public class RequestCode { ...@@ -188,4 +188,6 @@ public class RequestCode {
public static final int SEND_REPLY_MESSAGE_V2 = 325; public static final int SEND_REPLY_MESSAGE_V2 = 325;
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326; public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
public static final int ADD_WRITE_PERM_OF_BROKER = 327;
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.namesrv;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class AddWritePermOfBrokerRequestHeader implements CommandCustomHeader {
@CFNotNull
private String brokerName;
@Override
public void checkFields() throws RemotingCommandException {
}
public String getBrokerName() {
return brokerName;
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.namesrv;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class AddWritePermOfBrokerResponseHeader implements CommandCustomHeader {
@CFNotNull
private Integer addTopicCount;
@Override
public void checkFields() throws RemotingCommandException {
}
public Integer getAddTopicCount() {
return addTopicCount;
}
public void setAddTopicCount(Integer addTopicCount) {
this.addTopicCount = addTopicCount;
}
}
...@@ -566,6 +566,14 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker ...@@ -566,6 +566,14 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
<td class=xl67 width=87 style='width:65pt'>-b</td> <td class=xl67 width=87 style='width:65pt'>-b</td>
<td class=xl68 width=87 style='width:65pt'>BrokerName</td> <td class=xl68 width=87 style='width:65pt'>BrokerName</td>
</tr> </tr>
<tr height=57 style='height:43.0pt'>
<td rowspan=3 height=137 class=xl69 width=191 style='border-bottom:1.0pt;
height:103.0pt;border-top:none;width:143pt'>addWritePerm</td>
<td rowspan=3 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>从NameServer上添加 Broker写权限</td>
<td class=xl67 width=87 style='width:65pt'>-b</td>
<td class=xl68 width=87 style='width:65pt'>BrokerName</td>
</tr>
<tr height=57 style='height:43.0pt'> <tr height=57 style='height:43.0pt'>
<td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td> <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
<td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td> <td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
......
...@@ -426,6 +426,14 @@ Before introducing the mqadmin management tool, the following points need to be ...@@ -426,6 +426,14 @@ Before introducing the mqadmin management tool, the following points need to be
<td class=xl67 width=87 style='width:65pt'>-b</td> <td class=xl67 width=87 style='width:65pt'>-b</td>
<td class=xl68 width=87 style='width:65pt'>Declare the BrokerName</td> <td class=xl68 width=87 style='width:65pt'>Declare the BrokerName</td>
</tr> </tr>
<tr height=57 style='height:43.0pt'>
<td rowspan=3 height=137 class=xl69 width=191 style='border-bottom:1.0pt;
height:103.0pt;border-top:none;width:143pt'>addWritePerm</td>
<td rowspan=3 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>Add write permissions for broker from nameServer</td>
<td class=xl67 width=87 style='width:65pt'>-b</td>
<td class=xl68 width=87 style='width:65pt'>Declare the BrokerName</td>
</tr>
<tr height=57 style='height:43.0pt'> <tr height=57 style='height:43.0pt'>
<td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td> <td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
<td class=xl68 width=87 style='width:65pt'>Service address used to specify nameServer and formatted as ip:port</td> <td class=xl68 width=87 style='width:65pt'>Service address used to specify nameServer and formatted as ip:port</td>
......
...@@ -27,6 +27,8 @@ import org.apache.rocketmq.common.MixAll; ...@@ -27,6 +27,8 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil; import org.apache.rocketmq.common.namesrv.NamesrvUtil;
...@@ -103,6 +105,8 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen ...@@ -103,6 +105,8 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
return this.getBrokerClusterInfo(ctx, request); return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER: case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request); return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.ADD_WRITE_PERM_OF_BROKER:
return this.addWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request); return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV: case RequestCode.DELETE_TOPIC_IN_NAMESRV:
...@@ -402,6 +406,24 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen ...@@ -402,6 +406,24 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
return response; return response;
} }
private RemotingCommand addWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(AddWritePermOfBrokerResponseHeader.class);
final AddWritePermOfBrokerResponseHeader responseHeader = (AddWritePermOfBrokerResponseHeader) response.readCustomHeader();
final AddWritePermOfBrokerRequestHeader requestHeader = (AddWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(AddWritePermOfBrokerRequestHeader.class);
int addTopicCnt = this.namesrvController.getRouteInfoManager().addWritePermOfBrokerByLock(requestHeader.getBrokerName());
log.info("add write perm of broker[{}], client: {}, {}",
requestHeader.getBrokerName(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
addTopicCnt);
responseHeader.setAddTopicCount(addTopicCnt);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) { private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
......
...@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MixAll; ...@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
...@@ -252,40 +253,52 @@ public class RouteInfoManager { ...@@ -252,40 +253,52 @@ public class RouteInfoManager {
} }
public int wipeWritePermOfBrokerByLock(final String brokerName) { public int wipeWritePermOfBrokerByLock(final String brokerName) {
return operateWritePermOfBrokerByLock(brokerName, RequestCode.WIPE_WRITE_PERM_OF_BROKER);
}
public int addWritePermOfBrokerByLock(final String brokerName) {
return operateWritePermOfBrokerByLock(brokerName, RequestCode.ADD_WRITE_PERM_OF_BROKER);
}
private int operateWritePermOfBrokerByLock(final String brokerName, final int requestCode) {
try { try {
try { try {
this.lock.writeLock().lockInterruptibly(); this.lock.writeLock().lockInterruptibly();
return wipeWritePermOfBroker(brokerName); return operateWritePermOfBroker(brokerName, requestCode);
} finally { } finally {
this.lock.writeLock().unlock(); this.lock.writeLock().unlock();
} }
} catch (Exception e) { } catch (Exception e) {
log.error("wipeWritePermOfBrokerByLock Exception", e); log.error("operateWritePermOfBrokerByLock Exception", e);
} }
return 0; return 0;
} }
private int wipeWritePermOfBroker(final String brokerName) {
int wipeTopicCnt = 0; private int operateWritePermOfBroker(final String brokerName, final int requestCode) {
Iterator<Entry<String, List<QueueData>>> itTopic = this.topicQueueTable.entrySet().iterator(); int topicCnt = 0;
while (itTopic.hasNext()) { for (Entry<String, List<QueueData>> entry : this.topicQueueTable.entrySet()) {
Entry<String, List<QueueData>> entry = itTopic.next();
List<QueueData> qdList = entry.getValue(); List<QueueData> qdList = entry.getValue();
Iterator<QueueData> it = qdList.iterator(); for (QueueData qd : qdList) {
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) { if (qd.getBrokerName().equals(brokerName)) {
int perm = qd.getPerm(); int perm = qd.getPerm();
perm &= ~PermName.PERM_WRITE; switch (requestCode) {
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
perm &= ~PermName.PERM_WRITE;
break;
case RequestCode.ADD_WRITE_PERM_OF_BROKER:
perm = PermName.PERM_READ | PermName.PERM_WRITE;
break;
}
qd.setPerm(perm); qd.setPerm(perm);
wipeTopicCnt++; topicCnt++;
} }
} }
} }
return wipeTopicCnt; return topicCnt;
} }
public void unregisterBroker( public void unregisterBroker(
......
...@@ -17,17 +17,23 @@ ...@@ -17,17 +17,23 @@
package org.apache.rocketmq.namesrv.routeinfo; package org.apache.rocketmq.namesrv.routeinfo;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
...@@ -74,14 +80,28 @@ public class RouteInfoManagerTest { ...@@ -74,14 +80,28 @@ public class RouteInfoManagerTest {
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap); topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
Channel channel = mock(Channel.class); Channel channel = mock(Channel.class);
RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001", RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001",
topicConfigSerializeWrapper, new ArrayList<String>(), channel); topicConfigSerializeWrapper, new ArrayList<String>(), channel);
assertThat(registerBrokerResult).isNotNull(); assertThat(registerBrokerResult).isNotNull();
} }
@Test @Test
public void testWipeWritePermOfBrokerByLock() { public void testWipeWritePermOfBrokerByLock() throws Exception {
int result = routeInfoManager.wipeWritePermOfBrokerByLock("default-broker"); List<QueueData> qdList = new ArrayList<>();
assertThat(result).isEqualTo(0); QueueData qd = new QueueData();
qd.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
qd.setBrokerName("broker-a");
qdList.add(qd);
HashMap<String, List<QueueData>> topicQueueTable = new HashMap<>();
topicQueueTable.put("topic-a", qdList);
Field filed = RouteInfoManager.class.getDeclaredField("topicQueueTable");
filed.setAccessible(true);
filed.set(routeInfoManager, topicQueueTable);
int addTopicCnt = routeInfoManager.wipeWritePermOfBrokerByLock("broker-a");
assertThat(addTopicCnt).isEqualTo(1);
assertThat(qd.getPerm()).isEqualTo(PermName.PERM_READ);
} }
@Test @Test
...@@ -119,4 +139,24 @@ public class RouteInfoManagerTest { ...@@ -119,4 +139,24 @@ public class RouteInfoManagerTest {
byte[] topicList = routeInfoManager.getHasUnitSubUnUnitTopicList(); byte[] topicList = routeInfoManager.getHasUnitSubUnUnitTopicList();
assertThat(topicList).isNotNull(); assertThat(topicList).isNotNull();
} }
@Test
public void testAddWritePermOfBrokerByLock() throws Exception {
List<QueueData> qdList = new ArrayList<>();
QueueData qd = new QueueData();
qd.setPerm(PermName.PERM_READ);
qd.setBrokerName("broker-a");
qdList.add(qd);
HashMap<String, List<QueueData>> topicQueueTable = new HashMap<>();
topicQueueTable.put("topic-a", qdList);
Field filed = RouteInfoManager.class.getDeclaredField("topicQueueTable");
filed.setAccessible(true);
filed.set(routeInfoManager, topicQueueTable);
int addTopicCnt = routeInfoManager.addWritePermOfBrokerByLock("broker-a");
assertThat(addTopicCnt).isEqualTo(1);
assertThat(qd.getPerm()).isEqualTo(PermName.PERM_READ | PermName.PERM_WRITE);
}
} }
\ No newline at end of file
...@@ -288,6 +288,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { ...@@ -288,6 +288,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.wipeWritePermOfBroker(namesrvAddr, brokerName); return defaultMQAdminExtImpl.wipeWritePermOfBroker(namesrvAddr, brokerName);
} }
@Override
public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
return defaultMQAdminExtImpl.addWritePermOfBroker(namesrvAddr, brokerName);
}
@Override @Override
public void putKVConfig(String namespace, String key, String value) { public void putKVConfig(String namespace, String key, String value) {
defaultMQAdminExtImpl.putKVConfig(namespace, key, value); defaultMQAdminExtImpl.putKVConfig(namespace, key, value);
......
...@@ -382,6 +382,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -382,6 +382,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis); return this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis);
} }
@Override
public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
return this.mqClientInstance.getMQClientAPIImpl().addWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis);
}
@Override @Override
public void putKVConfig(String namespace, String key, String value) { public void putKVConfig(String namespace, String key, String value) {
} }
......
...@@ -134,6 +134,9 @@ public interface MQAdminExt extends MQAdmin { ...@@ -134,6 +134,9 @@ public interface MQAdminExt extends MQAdmin {
int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException, int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException; RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException;
int addWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException;
void putKVConfig(final String namespace, final String key, final String value); void putKVConfig(final String namespace, final String key, final String value);
String getKVConfig(final String namespace, String getKVConfig(final String namespace,
......
...@@ -61,6 +61,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; ...@@ -61,6 +61,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
import org.apache.rocketmq.tools.command.message.SendMessageCommand; import org.apache.rocketmq.tools.command.message.SendMessageCommand;
import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand;
import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand;
...@@ -185,6 +186,7 @@ public class MQAdminStartup { ...@@ -185,6 +186,7 @@ public class MQAdminStartup {
initCommand(new DeleteKvConfigCommand()); initCommand(new DeleteKvConfigCommand());
initCommand(new WipeWritePermSubCommand()); initCommand(new WipeWritePermSubCommand());
initCommand(new AddWritePermSubCommand());
initCommand(new ResetOffsetByTimeCommand()); initCommand(new ResetOffsetByTimeCommand());
initCommand(new UpdateOrderConfCommand()); initCommand(new UpdateOrderConfCommand());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.List;
public class AddWritePermSubCommand implements SubCommand {
@Override
public String commandName() {
return "addWritePerm";
}
@Override
public String commandDesc() {
return "Add write perm of broker in all name server you defined in the -n param";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerName", true, "broker name");
opt.setRequired(true);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String brokerName = commandLine.getOptionValue('b').trim();
List<String> namesrvList = defaultMQAdminExt.getNameServerAddressList();
if (namesrvList != null) {
for (String namesrvAddr : namesrvList) {
try {
int addTopicCount = defaultMQAdminExt.addWritePermOfBroker(namesrvAddr, brokerName);
System.out.printf("add write perm of broker[%s] in name server[%s] OK, %d%n",
brokerName,
namesrvAddr,
addTopicCount
);
} catch (Exception e) {
System.out.printf("add write perm of broker[%s] in name server[%s] Failed%n",
brokerName,
namesrvAddr
);
e.printStackTrace();
}
}
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
...@@ -34,7 +34,7 @@ public class WipeWritePermSubCommand implements SubCommand { ...@@ -34,7 +34,7 @@ public class WipeWritePermSubCommand implements SubCommand {
@Override @Override
public String commandDesc() { public String commandDesc() {
return "Wipe write perm of broker in all name server"; return "Wipe write perm of broker in all name server you defined in the -n param";
} }
@Override @Override
......
...@@ -209,6 +209,7 @@ public class DefaultMQAdminExtTest { ...@@ -209,6 +209,7 @@ public class DefaultMQAdminExtTest {
when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection); when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection);
when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6); when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6);
when(mQClientAPIImpl.addWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(7);
TopicStatsTable topicStatsTable = new TopicStatsTable(); TopicStatsTable topicStatsTable = new TopicStatsTable();
topicStatsTable.setOffsetTable(new HashMap<MessageQueue, TopicOffset>()); topicStatsTable.setOffsetTable(new HashMap<MessageQueue, TopicOffset>());
...@@ -299,6 +300,12 @@ public class DefaultMQAdminExtTest { ...@@ -299,6 +300,12 @@ public class DefaultMQAdminExtTest {
assertThat(result).isEqualTo(6); assertThat(result).isEqualTo(6);
} }
@Test
public void testAddWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException {
int result = defaultMQAdminExt.addWritePermOfBroker("127.0.0.1:9876", "default-broker");
assertThat(result).isEqualTo(7);
}
@Test @Test
public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException { public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo("UnitTest"); TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo("UnitTest");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.Test;
public class AddWritePermSubCommandTest {
@Test
public void testExecute() throws SubCommandException {
AddWritePermSubCommand cmd = new AddWritePermSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[]{"-b default-broker"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册