From 8d964f45654db2759d30dd6f179312b8fc8be404 Mon Sep 17 00:00:00 2001 From: coder-zzzz <349427217@qq.com> Date: Mon, 16 Mar 2020 18:26:16 +0800 Subject: [PATCH] close #1848 --- .../rocketmq/client/impl/MQClientAPIImpl.java | 24 ++++++ .../client/impl/MQClientAPIImplTest.java | 28 ++++++- .../rocketmq/common/protocol/RequestCode.java | 2 + .../AddWritePermOfBrokerRequestHeader.java | 39 +++++++++ .../AddWritePermOfBrokerResponseHeader.java | 38 +++++++++ .../processor/DefaultRequestProcessor.java | 22 +++++ .../namesrv/routeinfo/RouteInfoManager.java | 39 ++++++--- .../routeinfo/RouteInfoManagerTest.java | 52 ++++++++++-- .../tools/admin/DefaultMQAdminExt.java | 5 ++ .../tools/admin/DefaultMQAdminExtImpl.java | 6 ++ .../rocketmq/tools/admin/MQAdminExt.java | 3 + .../tools/command/MQAdminStartup.java | 2 + .../namesrv/AddWritePermSubCommand.java | 80 +++++++++++++++++++ .../namesrv/WipeWritePermSubCommand.java | 2 +- .../tools/admin/DefaultMQAdminExtTest.java | 7 ++ .../namesrv/AddWritePermSubCommandTest.java | 37 +++++++++ 16 files changed, 365 insertions(+), 21 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerRequestHeader.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerResponseHeader.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 1ad5fbfe..107bb3a4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -135,6 +135,8 @@ import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRe import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader; @@ -1432,6 +1434,28 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } + public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, final long timeoutMillis) + throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + AddWritePermOfBrokerRequestHeader requestHeader = new AddWritePermOfBrokerRequestHeader(); + requestHeader.setBrokerName(brokerName); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ADD_WRITE_PERM_OF_BROKER, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(nameSrvAddr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + AddWritePermOfBrokerResponseHeader responseHeader = + (AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class); + return responseHeader.getAddTopicCount(); + } + default: + break; + } + throw new MQClientException(response.getCode(), response.getRemark()); + } + public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 3f00d9e4..e1b3bed7 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.impl; -import java.lang.reflect.Field; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -29,9 +28,11 @@ import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -48,6 +49,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import java.lang.reflect.Field; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.mockito.ArgumentMatchers.any; @@ -442,4 +445,27 @@ public class MQClientAPIImplTest { requestHeader.setMaxReconsumeTimes(10); return requestHeader; } + + @Test + public void testAddWritePermOfBroker() throws Exception { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + RemotingCommand request = invocationOnMock.getArgument(1); + if (request.getCode() != RequestCode.ADD_WRITE_PERM_OF_BROKER) { + return null; + } + + RemotingCommand response = RemotingCommand.createResponseCommand(AddWritePermOfBrokerResponseHeader.class); + AddWritePermOfBrokerResponseHeader responseHeader = (AddWritePermOfBrokerResponseHeader) response.readCustomHeader(); + response.setCode(ResponseCode.SUCCESS); + responseHeader.setAddTopicCount(7); + response.addExtField("addTopicCount", String.valueOf(responseHeader.getAddTopicCount())); + return response; + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + int topicCnt = mqClientAPI.addWritePermOfBroker("127.0.0.1", "default-broker", 1000); + assertThat(topicCnt).isEqualTo(7); + } } \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index b3009d73..0ce7c695 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -188,4 +188,6 @@ public class RequestCode { public static final int SEND_REPLY_MESSAGE_V2 = 325; public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326; + + public static final int ADD_WRITE_PERM_OF_BROKER = 327; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerRequestHeader.java new file mode 100644 index 00000000..17fd3f5e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerRequestHeader.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AddWritePermOfBrokerRequestHeader implements CommandCustomHeader { + @CFNotNull + private String brokerName; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerResponseHeader.java new file mode 100644 index 00000000..d217206a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerResponseHeader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AddWritePermOfBrokerResponseHeader implements CommandCustomHeader { + @CFNotNull + private Integer addTopicCount; + + @Override + public void checkFields() throws RemotingCommandException { + } + + public Integer getAddTopicCount() { + return addTopicCount; + } + + public void setAddTopicCount(Integer addTopicCount) { + this.addTopicCount = addTopicCount; + } +} diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 467078c4..f3ec595b 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.namesrv.NamesrvUtil; @@ -102,6 +104,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); + case RequestCode.ADD_WRITE_PERM_OF_BROKER: + return this.addWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: @@ -394,6 +398,24 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } + private RemotingCommand addWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(AddWritePermOfBrokerResponseHeader.class); + final AddWritePermOfBrokerResponseHeader responseHeader = (AddWritePermOfBrokerResponseHeader) response.readCustomHeader(); + final AddWritePermOfBrokerRequestHeader requestHeader = (AddWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(AddWritePermOfBrokerRequestHeader.class); + + int addTopicCnt = this.namesrvController.getRouteInfoManager().addWritePermOfBrokerByLock(requestHeader.getBrokerName()); + + log.info("add write perm of broker[{}], client: {}, {}", + requestHeader.getBrokerName(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + addTopicCnt); + + responseHeader.setAddTopicCount(addTopicCnt); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index ecd057a2..f76a43da 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; @@ -252,40 +253,52 @@ public class RouteInfoManager { } public int wipeWritePermOfBrokerByLock(final String brokerName) { + return operateWritePermOfBrokerByLock(brokerName, RequestCode.WIPE_WRITE_PERM_OF_BROKER); + } + + public int addWritePermOfBrokerByLock(final String brokerName) { + return operateWritePermOfBrokerByLock(brokerName, RequestCode.ADD_WRITE_PERM_OF_BROKER); + } + + private int operateWritePermOfBrokerByLock(final String brokerName, final int requestCode) { try { try { this.lock.writeLock().lockInterruptibly(); - return wipeWritePermOfBroker(brokerName); + return operateWritePermOfBroker(brokerName, requestCode); } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { - log.error("wipeWritePermOfBrokerByLock Exception", e); + log.error("operateWritePermOfBrokerByLock Exception", e); } return 0; } - private int wipeWritePermOfBroker(final String brokerName) { - int wipeTopicCnt = 0; - Iterator>> itTopic = this.topicQueueTable.entrySet().iterator(); - while (itTopic.hasNext()) { - Entry> entry = itTopic.next(); + + private int operateWritePermOfBroker(final String brokerName, final int requestCode) { + int topicCnt = 0; + for (Entry> entry : this.topicQueueTable.entrySet()) { List qdList = entry.getValue(); - Iterator it = qdList.iterator(); - while (it.hasNext()) { - QueueData qd = it.next(); + for (QueueData qd : qdList) { if (qd.getBrokerName().equals(brokerName)) { int perm = qd.getPerm(); - perm &= ~PermName.PERM_WRITE; + switch (requestCode) { + case RequestCode.WIPE_WRITE_PERM_OF_BROKER: + perm &= ~PermName.PERM_WRITE; + break; + case RequestCode.ADD_WRITE_PERM_OF_BROKER: + perm = PermName.PERM_READ | PermName.PERM_WRITE; + break; + } qd.setPerm(perm); - wipeTopicCnt++; + topicCnt++; } } } - return wipeTopicCnt; + return topicCnt; } public void unregisterBroker( diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java index 5ab77be1..e0d9e187 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java @@ -17,17 +17,23 @@ package org.apache.rocketmq.namesrv.routeinfo; import io.netty.channel.Channel; -import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -74,14 +80,28 @@ public class RouteInfoManagerTest { topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap); Channel channel = mock(Channel.class); RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001", - topicConfigSerializeWrapper, new ArrayList(), channel); + topicConfigSerializeWrapper, new ArrayList(), channel); assertThat(registerBrokerResult).isNotNull(); } @Test - public void testWipeWritePermOfBrokerByLock() { - int result = routeInfoManager.wipeWritePermOfBrokerByLock("default-broker"); - assertThat(result).isEqualTo(0); + public void testWipeWritePermOfBrokerByLock() throws Exception { + List qdList = new ArrayList<>(); + QueueData qd = new QueueData(); + qd.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + qd.setBrokerName("broker-a"); + qdList.add(qd); + HashMap> topicQueueTable = new HashMap<>(); + topicQueueTable.put("topic-a", qdList); + + Field filed = RouteInfoManager.class.getDeclaredField("topicQueueTable"); + filed.setAccessible(true); + filed.set(routeInfoManager, topicQueueTable); + + int addTopicCnt = routeInfoManager.wipeWritePermOfBrokerByLock("broker-a"); + assertThat(addTopicCnt).isEqualTo(1); + assertThat(qd.getPerm()).isEqualTo(PermName.PERM_READ); + } @Test @@ -119,4 +139,24 @@ public class RouteInfoManagerTest { byte[] topicList = routeInfoManager.getHasUnitSubUnUnitTopicList(); assertThat(topicList).isNotNull(); } + + @Test + public void testAddWritePermOfBrokerByLock() throws Exception { + List qdList = new ArrayList<>(); + QueueData qd = new QueueData(); + qd.setPerm(PermName.PERM_READ); + qd.setBrokerName("broker-a"); + qdList.add(qd); + HashMap> topicQueueTable = new HashMap<>(); + topicQueueTable.put("topic-a", qdList); + + Field filed = RouteInfoManager.class.getDeclaredField("topicQueueTable"); + filed.setAccessible(true); + filed.set(routeInfoManager, topicQueueTable); + + int addTopicCnt = routeInfoManager.addWritePermOfBrokerByLock("broker-a"); + assertThat(addTopicCnt).isEqualTo(1); + assertThat(qd.getPerm()).isEqualTo(PermName.PERM_READ | PermName.PERM_WRITE); + + } } \ No newline at end of file diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 1ca3fe4c..13b36c6c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -282,6 +282,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { return defaultMQAdminExtImpl.wipeWritePermOfBroker(namesrvAddr, brokerName); } + @Override + public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.addWritePermOfBroker(namesrvAddr, brokerName); + } + @Override public void putKVConfig(String namespace, String key, String value) { defaultMQAdminExtImpl.putKVConfig(namespace, key, value); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 22d4005c..5a1207a6 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -382,6 +382,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis); } + @Override + public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + return this.mqClientInstance.getMQClientAPIImpl().addWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis); + } + @Override public void putKVConfig(String namespace, String key, String value) { } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 17b62251..e523ace1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -134,6 +134,9 @@ public interface MQAdminExt extends MQAdmin { int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException; + int addWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException; + void putKVConfig(final String namespace, final String key, final String value); String getKVConfig(final String namespace, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 28431a96..5e1f0178 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -60,6 +60,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; import org.apache.rocketmq.tools.command.message.SendMessageCommand; +import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand; @@ -183,6 +184,7 @@ public class MQAdminStartup { initCommand(new DeleteKvConfigCommand()); initCommand(new WipeWritePermSubCommand()); + initCommand(new AddWritePermSubCommand()); initCommand(new ResetOffsetByTimeCommand()); initCommand(new UpdateOrderConfCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java new file mode 100644 index 00000000..98542d06 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.namesrv; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.util.List; + +public class AddWritePermSubCommand implements SubCommand { + @Override + public String commandName() { + return "addWritePerm"; + } + + @Override + public String commandDesc() { + return "Add write perm of broker in all name server you defined in the -n param"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerName", true, "broker name"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + defaultMQAdminExt.start(); + String brokerName = commandLine.getOptionValue('b').trim(); + List namesrvList = defaultMQAdminExt.getNameServerAddressList(); + if (namesrvList != null) { + for (String namesrvAddr : namesrvList) { + try { + int addTopicCount = defaultMQAdminExt.addWritePermOfBroker(namesrvAddr, brokerName); + System.out.printf("add write perm of broker[%s] in name server[%s] OK, %d%n", + brokerName, + namesrvAddr, + addTopicCount + ); + } catch (Exception e) { + System.out.printf("add write perm of broker[%s] in name server[%s] Failed%n", + brokerName, + namesrvAddr + ); + e.printStackTrace(); + } + } + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + "command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java index f8868335..213931ed 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java @@ -34,7 +34,7 @@ public class WipeWritePermSubCommand implements SubCommand { @Override public String commandDesc() { - return "Wipe write perm of broker in all name server"; + return "Wipe write perm of broker in all name server you defined in the -n param"; } @Override diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 3146b178..8a29e0f3 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -209,6 +209,7 @@ public class DefaultMQAdminExtTest { when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection); when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6); + when(mQClientAPIImpl.addWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(7); TopicStatsTable topicStatsTable = new TopicStatsTable(); topicStatsTable.setOffsetTable(new HashMap()); @@ -299,6 +300,12 @@ public class DefaultMQAdminExtTest { assertThat(result).isEqualTo(6); } + @Test + public void testAddWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException { + int result = defaultMQAdminExt.addWritePermOfBroker("127.0.0.1:9876", "default-broker"); + assertThat(result).isEqualTo(7); + } + @Test public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException { TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo("UnitTest"); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java new file mode 100644 index 00000000..901b8bbd --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.namesrv; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.junit.Test; + +public class AddWritePermSubCommandTest { + + @Test + public void testExecute() throws SubCommandException { + AddWritePermSubCommand cmd = new AddWritePermSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-b default-broker"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +} -- GitLab