From 46459426801d582b5ef71c8a3e38267ccd6caad9 Mon Sep 17 00:00:00 2001 From: panzhi Date: Wed, 24 Mar 2021 11:39:12 +0800 Subject: [PATCH] [ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer offset --- .../broker/offset/ConsumerOffsetManager.java | 16 ++++++++++++++++ .../broker/processor/AdminBrokerProcessor.java | 4 ++++ .../rocketmq/client/impl/MQClientAPIImpl.java | 3 ++- .../DeleteSubscriptionGroupRequestHeader.java | 10 ++++++++++ .../rocketmq/tools/admin/DefaultMQAdminExt.java | 7 +++++++ .../tools/admin/DefaultMQAdminExtImpl.java | 9 ++++++++- .../apache/rocketmq/tools/admin/MQAdminExt.java | 3 +++ .../consumer/DeleteSubscriptionGroupCommand.java | 13 +++++++++++-- 8 files changed, 61 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index ebc9dd8a..bd057587 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -232,4 +232,20 @@ public class ConsumerOffsetManager extends ConfigManager { } } + public void removeOffset(final String group) { + Iterator>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry> next = it.next(); + String topicAtGroup = next.getKey(); + if (topicAtGroup.contains(group)) { + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays.length == 2 && group.equals(arrays[1])) { + it.remove(); + log.warn("clean group offset {}", topicAtGroup); + } + } + } + + } + } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index dcdb7016..0a1d214b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -714,6 +714,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName()); + if (requestHeader.isRemoveOffset()) { + this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName()); + } + if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) { this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName()); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 7a4d5565..63b2045d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1467,10 +1467,11 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis) + public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader(); requestHeader.setGroupName(groupName); + requestHeader.setRemoveOffset(removeOffset); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java index dff9e2f3..6591d778 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java @@ -25,6 +25,8 @@ public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader @CFNotNull private String groupName; + private boolean removeOffset; + @Override public void checkFields() throws RemotingCommandException { } @@ -36,4 +38,12 @@ public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader public void setGroupName(String groupName) { this.groupName = groupName; } + + public boolean isRemoveOffset() { + return removeOffset; + } + + public void setRemoveOffset(boolean removeOffset) { + this.removeOffset = removeOffset; + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index e80a813e..8b1c228f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -320,6 +320,13 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName); } + @Override + public void deleteSubscriptionGroup(String addr, + String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName, removeOffset); + } + @Override public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 22d4005c..5c343708 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -424,7 +424,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, timeoutMillis); + this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, false, timeoutMillis); + } + + @Override + public void deleteSubscriptionGroup(String addr, + String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, removeOffset, timeoutMillis); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 17b62251..d5462cb0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -152,6 +152,9 @@ public interface MQAdminExt extends MQAdmin { void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + void deleteSubscriptionGroup(final String addr, String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java index 96d81956..fb0efeba 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java @@ -54,6 +54,10 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { opt.setRequired(true); options.addOption(opt); + opt = new Option("r", "removeOffset", true, "remove offset"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -65,11 +69,16 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { // groupName String groupName = commandLine.getOptionValue('g').trim(); + boolean removeOffset = false; + if (commandLine.hasOption('r')) { + removeOffset = Boolean.valueOf(commandLine.getOptionValue("r").trim()); + } + if (commandLine.hasOption('b')) { String addr = commandLine.getOptionValue('b').trim(); adminExt.start(); - adminExt.deleteSubscriptionGroup(addr, groupName); + adminExt.deleteSubscriptionGroup(addr, groupName, removeOffset); System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName, addr); @@ -80,7 +89,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { Set masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); for (String master : masterSet) { - adminExt.deleteSubscriptionGroup(master, groupName); + adminExt.deleteSubscriptionGroup(master, groupName, removeOffset); System.out.printf( "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n", groupName, master, clusterName); -- GitLab