diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index ebc9dd8acce90512f1f0ddc17c62bbd866057729..bd0575875970f29fd4dc97d14bfcedbb762c2cab 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -232,4 +232,20 @@ public class ConsumerOffsetManager extends ConfigManager { } } + public void removeOffset(final String group) { + Iterator>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry> next = it.next(); + String topicAtGroup = next.getKey(); + if (topicAtGroup.contains(group)) { + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays.length == 2 && group.equals(arrays[1])) { + it.remove(); + log.warn("clean group offset {}", topicAtGroup); + } + } + } + + } + } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index dcdb701635884c243121f39dc09de67fbe1a226d..0a1d214b87f4288cf97e2fd3d76a37e28dd04d50 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -714,6 +714,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName()); + if (requestHeader.isRemoveOffset()) { + this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName()); + } + if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) { this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName()); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 7a4d55654105b7449bebe4ca4a7eb17933466c1c..63b2045d1d5d5665217ac80c106cd703205c53eb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1467,10 +1467,11 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis) + public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader(); requestHeader.setGroupName(groupName); + requestHeader.setRemoveOffset(removeOffset); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java index dff9e2f35ce5d93478d4794816355b85048f3e29..6591d7780cd3df202424fff94109020f99f71739 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java @@ -25,6 +25,8 @@ public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader @CFNotNull private String groupName; + private boolean removeOffset; + @Override public void checkFields() throws RemotingCommandException { } @@ -36,4 +38,12 @@ public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader public void setGroupName(String groupName) { this.groupName = groupName; } + + public boolean isRemoveOffset() { + return removeOffset; + } + + public void setRemoveOffset(boolean removeOffset) { + this.removeOffset = removeOffset; + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index e80a813eca2dc9c3f965887154ba896c1272100e..8b1c228f4f7ec95d7035ad112119d02c904d72b8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -320,6 +320,13 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName); } + @Override + public void deleteSubscriptionGroup(String addr, + String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName, removeOffset); + } + @Override public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 22d4005ce9a9e031c0b10e1d09ae6b1a471cc65c..5c343708b32842fd8d21cb70e80ad973da2f0f2d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -424,7 +424,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, timeoutMillis); + this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, false, timeoutMillis); + } + + @Override + public void deleteSubscriptionGroup(String addr, + String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, removeOffset, timeoutMillis); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 17b62251c60d5af461db085bb8587fbcf9f14f4f..d5462cb04e5a41873690c10cf67b880e21ad6bd1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -152,6 +152,9 @@ public interface MQAdminExt extends MQAdmin { void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + void deleteSubscriptionGroup(final String addr, String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java index 96d81956e4540e4574a027ebf9a88ffd16284431..fb0efebaa215445a52c016f5a90abb5cf06e56db 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java @@ -54,6 +54,10 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { opt.setRequired(true); options.addOption(opt); + opt = new Option("r", "removeOffset", true, "remove offset"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -65,11 +69,16 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { // groupName String groupName = commandLine.getOptionValue('g').trim(); + boolean removeOffset = false; + if (commandLine.hasOption('r')) { + removeOffset = Boolean.valueOf(commandLine.getOptionValue("r").trim()); + } + if (commandLine.hasOption('b')) { String addr = commandLine.getOptionValue('b').trim(); adminExt.start(); - adminExt.deleteSubscriptionGroup(addr, groupName); + adminExt.deleteSubscriptionGroup(addr, groupName, removeOffset); System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName, addr); @@ -80,7 +89,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand { Set masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName); for (String master : masterSet) { - adminExt.deleteSubscriptionGroup(master, groupName); + adminExt.deleteSubscriptionGroup(master, groupName, removeOffset); System.out.printf( "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n", groupName, master, clusterName);