diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 63b2045d1d5d5665217ac80c106cd703205c53eb..3bbdd84f5221ea5298336e92f38b8f24a2ccfa6d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -135,6 +135,8 @@ import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRe import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader; @@ -1428,6 +1430,28 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } + public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, final long timeoutMillis) + throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + AddWritePermOfBrokerRequestHeader requestHeader = new AddWritePermOfBrokerRequestHeader(); + requestHeader.setBrokerName(brokerName); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ADD_WRITE_PERM_OF_BROKER, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(nameSrvAddr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + AddWritePermOfBrokerResponseHeader responseHeader = + (AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class); + return responseHeader.getAddTopicCount(); + } + default: + break; + } + throw new MQClientException(response.getCode(), response.getRemark()); + } + public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 3f00d9e4030473f395c9b2a037e50b7e6fde24f0..e1b3bed76fde8416b4c2c2839ed06cfa91c7f393 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.impl; -import java.lang.reflect.Field; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -29,9 +28,11 @@ import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -48,6 +49,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import java.lang.reflect.Field; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.mockito.ArgumentMatchers.any; @@ -442,4 +445,27 @@ public class MQClientAPIImplTest { requestHeader.setMaxReconsumeTimes(10); return requestHeader; } + + @Test + public void testAddWritePermOfBroker() throws Exception { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + RemotingCommand request = invocationOnMock.getArgument(1); + if (request.getCode() != RequestCode.ADD_WRITE_PERM_OF_BROKER) { + return null; + } + + RemotingCommand response = RemotingCommand.createResponseCommand(AddWritePermOfBrokerResponseHeader.class); + AddWritePermOfBrokerResponseHeader responseHeader = (AddWritePermOfBrokerResponseHeader) response.readCustomHeader(); + response.setCode(ResponseCode.SUCCESS); + responseHeader.setAddTopicCount(7); + response.addExtField("addTopicCount", String.valueOf(responseHeader.getAddTopicCount())); + return response; + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + int topicCnt = mqClientAPI.addWritePermOfBroker("127.0.0.1", "default-broker", 1000); + assertThat(topicCnt).isEqualTo(7); + } } \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 75ceff38cfb9b8bbd086007be095012c0d076f8d..5624a7ec01c7247ac202739b00bf164ed40986f5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -188,4 +188,6 @@ public class RequestCode { public static final int SEND_REPLY_MESSAGE_V2 = 325; public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326; + + public static final int ADD_WRITE_PERM_OF_BROKER = 327; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..17fd3f5ea7e887cfbb973834fdcf10665982eb9f --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerRequestHeader.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AddWritePermOfBrokerRequestHeader implements CommandCustomHeader { + @CFNotNull + private String brokerName; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerResponseHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..d217206a9403176050404f3a8b53d25b8379c1cf --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/AddWritePermOfBrokerResponseHeader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AddWritePermOfBrokerResponseHeader implements CommandCustomHeader { + @CFNotNull + private Integer addTopicCount; + + @Override + public void checkFields() throws RemotingCommandException { + } + + public Integer getAddTopicCount() { + return addTopicCount; + } + + public void setAddTopicCount(Integer addTopicCount) { + this.addTopicCount = addTopicCount; + } +} diff --git a/docs/cn/operation.md b/docs/cn/operation.md index 5f5d9f861111961920b4a9e0d501ed43a7e1b4d8..e5275887b5ffc2ef22400aa3d3829c89b9a9b94f 100644 --- a/docs/cn/operation.md +++ b/docs/cn/operation.md @@ -566,6 +566,14 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker -b BrokerName + + addWritePerm + 从NameServer上添加 Broker写权限 + -b + BrokerName + -n NameServer 服务地址,格式 ip:port diff --git a/docs/en/CLITools.md b/docs/en/CLITools.md index 47ba69e57124f2cfa387c513f0ffdb4b3016853c..c53b69bf8c0297f4bca0c83d277c3245f60d15d0 100644 --- a/docs/en/CLITools.md +++ b/docs/en/CLITools.md @@ -426,6 +426,14 @@ Before introducing the mqadmin management tool, the following points need to be -b Declare the BrokerName + + addWritePerm + Add write permissions for broker from nameServer + -b + Declare the BrokerName + -n Service address used to specify nameServer and formatted as ip:port diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index f8bc55e7aab38e1f27abc75c674c28eab0a9d02b..98e96dfe7db5ca74fe42537415834e815b586c29 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.namesrv.NamesrvUtil; @@ -103,6 +105,8 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); + case RequestCode.ADD_WRITE_PERM_OF_BROKER: + return this.addWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: @@ -402,6 +406,24 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen return response; } + private RemotingCommand addWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(AddWritePermOfBrokerResponseHeader.class); + final AddWritePermOfBrokerResponseHeader responseHeader = (AddWritePermOfBrokerResponseHeader) response.readCustomHeader(); + final AddWritePermOfBrokerRequestHeader requestHeader = (AddWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(AddWritePermOfBrokerRequestHeader.class); + + int addTopicCnt = this.namesrvController.getRouteInfoManager().addWritePermOfBrokerByLock(requestHeader.getBrokerName()); + + log.info("add write perm of broker[{}], client: {}, {}", + requestHeader.getBrokerName(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + addTopicCnt); + + responseHeader.setAddTopicCount(addTopicCnt); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index edef87ce2d7e5d18f00610ad2619a55357d7a5d5..982d5439469d6da1f418d18996cace47543f51f2 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; @@ -252,40 +253,52 @@ public class RouteInfoManager { } public int wipeWritePermOfBrokerByLock(final String brokerName) { + return operateWritePermOfBrokerByLock(brokerName, RequestCode.WIPE_WRITE_PERM_OF_BROKER); + } + + public int addWritePermOfBrokerByLock(final String brokerName) { + return operateWritePermOfBrokerByLock(brokerName, RequestCode.ADD_WRITE_PERM_OF_BROKER); + } + + private int operateWritePermOfBrokerByLock(final String brokerName, final int requestCode) { try { try { this.lock.writeLock().lockInterruptibly(); - return wipeWritePermOfBroker(brokerName); + return operateWritePermOfBroker(brokerName, requestCode); } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { - log.error("wipeWritePermOfBrokerByLock Exception", e); + log.error("operateWritePermOfBrokerByLock Exception", e); } return 0; } - private int wipeWritePermOfBroker(final String brokerName) { - int wipeTopicCnt = 0; - Iterator>> itTopic = this.topicQueueTable.entrySet().iterator(); - while (itTopic.hasNext()) { - Entry> entry = itTopic.next(); + + private int operateWritePermOfBroker(final String brokerName, final int requestCode) { + int topicCnt = 0; + for (Entry> entry : this.topicQueueTable.entrySet()) { List qdList = entry.getValue(); - Iterator it = qdList.iterator(); - while (it.hasNext()) { - QueueData qd = it.next(); + for (QueueData qd : qdList) { if (qd.getBrokerName().equals(brokerName)) { int perm = qd.getPerm(); - perm &= ~PermName.PERM_WRITE; + switch (requestCode) { + case RequestCode.WIPE_WRITE_PERM_OF_BROKER: + perm &= ~PermName.PERM_WRITE; + break; + case RequestCode.ADD_WRITE_PERM_OF_BROKER: + perm = PermName.PERM_READ | PermName.PERM_WRITE; + break; + } qd.setPerm(perm); - wipeTopicCnt++; + topicCnt++; } } } - return wipeTopicCnt; + return topicCnt; } public void unregisterBroker( diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java index 5ab77be1ebe115cf60c4290b5fc44a8e28aec01c..e0d9e1871c04aabc25cd626edebdeb2b08b8d315 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java @@ -17,17 +17,23 @@ package org.apache.rocketmq.namesrv.routeinfo; import io.netty.channel.Channel; -import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -74,14 +80,28 @@ public class RouteInfoManagerTest { topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap); Channel channel = mock(Channel.class); RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001", - topicConfigSerializeWrapper, new ArrayList(), channel); + topicConfigSerializeWrapper, new ArrayList(), channel); assertThat(registerBrokerResult).isNotNull(); } @Test - public void testWipeWritePermOfBrokerByLock() { - int result = routeInfoManager.wipeWritePermOfBrokerByLock("default-broker"); - assertThat(result).isEqualTo(0); + public void testWipeWritePermOfBrokerByLock() throws Exception { + List qdList = new ArrayList<>(); + QueueData qd = new QueueData(); + qd.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + qd.setBrokerName("broker-a"); + qdList.add(qd); + HashMap> topicQueueTable = new HashMap<>(); + topicQueueTable.put("topic-a", qdList); + + Field filed = RouteInfoManager.class.getDeclaredField("topicQueueTable"); + filed.setAccessible(true); + filed.set(routeInfoManager, topicQueueTable); + + int addTopicCnt = routeInfoManager.wipeWritePermOfBrokerByLock("broker-a"); + assertThat(addTopicCnt).isEqualTo(1); + assertThat(qd.getPerm()).isEqualTo(PermName.PERM_READ); + } @Test @@ -119,4 +139,24 @@ public class RouteInfoManagerTest { byte[] topicList = routeInfoManager.getHasUnitSubUnUnitTopicList(); assertThat(topicList).isNotNull(); } + + @Test + public void testAddWritePermOfBrokerByLock() throws Exception { + List qdList = new ArrayList<>(); + QueueData qd = new QueueData(); + qd.setPerm(PermName.PERM_READ); + qd.setBrokerName("broker-a"); + qdList.add(qd); + HashMap> topicQueueTable = new HashMap<>(); + topicQueueTable.put("topic-a", qdList); + + Field filed = RouteInfoManager.class.getDeclaredField("topicQueueTable"); + filed.setAccessible(true); + filed.set(routeInfoManager, topicQueueTable); + + int addTopicCnt = routeInfoManager.addWritePermOfBrokerByLock("broker-a"); + assertThat(addTopicCnt).isEqualTo(1); + assertThat(qd.getPerm()).isEqualTo(PermName.PERM_READ | PermName.PERM_WRITE); + + } } \ No newline at end of file diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 6592639035f922a362f7503342d34f7ff06ac11b..c8f5ccf38c74ec41bc9bea5c878de6e09490b71a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -288,6 +288,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { return defaultMQAdminExtImpl.wipeWritePermOfBroker(namesrvAddr, brokerName); } + @Override + public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.addWritePermOfBroker(namesrvAddr, brokerName); + } + @Override public void putKVConfig(String namespace, String key, String value) { defaultMQAdminExtImpl.putKVConfig(namespace, key, value); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 8930bbe49d207ab2dc361caaee6d88f75762cbea..c7636d907214b788bbd602b3c95ee8c7178b949a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -382,6 +382,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis); } + @Override + public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + return this.mqClientInstance.getMQClientAPIImpl().addWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis); + } + @Override public void putKVConfig(String namespace, String key, String value) { } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index d5462cb04e5a41873690c10cf67b880e21ad6bd1..a06c2c45e654181cb415fea1dc81df8806c728bc 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -134,6 +134,9 @@ public interface MQAdminExt extends MQAdmin { int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException; + int addWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException; + void putKVConfig(final String namespace, final String key, final String value); String getKVConfig(final String namespace, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index f9477445bea3d895dd9f52a0c7f7a1127ee95d67..25434dae4f85b2c4ec11a787b940c612351d203d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -61,6 +61,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand; import org.apache.rocketmq.tools.command.message.SendMessageCommand; +import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand; @@ -185,6 +186,7 @@ public class MQAdminStartup { initCommand(new DeleteKvConfigCommand()); initCommand(new WipeWritePermSubCommand()); + initCommand(new AddWritePermSubCommand()); initCommand(new ResetOffsetByTimeCommand()); initCommand(new UpdateOrderConfCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..98542d065d5459ece5922b2af2f3260c7e350bd5 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.namesrv; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.util.List; + +public class AddWritePermSubCommand implements SubCommand { + @Override + public String commandName() { + return "addWritePerm"; + } + + @Override + public String commandDesc() { + return "Add write perm of broker in all name server you defined in the -n param"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerName", true, "broker name"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + defaultMQAdminExt.start(); + String brokerName = commandLine.getOptionValue('b').trim(); + List namesrvList = defaultMQAdminExt.getNameServerAddressList(); + if (namesrvList != null) { + for (String namesrvAddr : namesrvList) { + try { + int addTopicCount = defaultMQAdminExt.addWritePermOfBroker(namesrvAddr, brokerName); + System.out.printf("add write perm of broker[%s] in name server[%s] OK, %d%n", + brokerName, + namesrvAddr, + addTopicCount + ); + } catch (Exception e) { + System.out.printf("add write perm of broker[%s] in name server[%s] Failed%n", + brokerName, + namesrvAddr + ); + e.printStackTrace(); + } + } + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + "command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java index f8868335a18fd68acdd5c1eb2406930d4300149a..213931ed86e16beb3df37dc1a0161c56a5c9572c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java @@ -34,7 +34,7 @@ public class WipeWritePermSubCommand implements SubCommand { @Override public String commandDesc() { - return "Wipe write perm of broker in all name server"; + return "Wipe write perm of broker in all name server you defined in the -n param"; } @Override diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 3146b1781154792a2d7928e0306e9cf172b5ddac..8a29e0f34ec22b03e87d3275d890557945ab449c 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -209,6 +209,7 @@ public class DefaultMQAdminExtTest { when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection); when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6); + when(mQClientAPIImpl.addWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(7); TopicStatsTable topicStatsTable = new TopicStatsTable(); topicStatsTable.setOffsetTable(new HashMap()); @@ -299,6 +300,12 @@ public class DefaultMQAdminExtTest { assertThat(result).isEqualTo(6); } + @Test + public void testAddWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException { + int result = defaultMQAdminExt.addWritePermOfBroker("127.0.0.1:9876", "default-broker"); + assertThat(result).isEqualTo(7); + } + @Test public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException { TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo("UnitTest"); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java new file mode 100644 index 0000000000000000000000000000000000000000..901b8bbd3be25783554d251b98b1db26bf806f76 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.namesrv; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.junit.Test; + +public class AddWritePermSubCommandTest { + + @Test + public void testExecute() throws SubCommandException { + AddWritePermSubCommand cmd = new AddWritePermSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-b default-broker"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +}