未验证 提交 46459426 编写于 作者: P panzhi 提交者: GitHub

[ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer offset

上级 735ecaa8
......@@ -232,4 +232,20 @@ public class ConsumerOffsetManager extends ConfigManager {
}
}
public void removeOffset(final String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey();
if (topicAtGroup.contains(group)) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
it.remove();
log.warn("clean group offset {}", topicAtGroup);
}
}
}
}
}
......@@ -714,6 +714,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
if (requestHeader.isRemoveOffset()) {
this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName());
}
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
}
......
......@@ -1467,10 +1467,11 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis)
public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
requestHeader.setGroupName(groupName);
requestHeader.setRemoveOffset(removeOffset);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
......
......@@ -25,6 +25,8 @@ public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader
@CFNotNull
private String groupName;
private boolean removeOffset;
@Override
public void checkFields() throws RemotingCommandException {
}
......@@ -36,4 +38,12 @@ public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public boolean isRemoveOffset() {
return removeOffset;
}
public void setRemoveOffset(boolean removeOffset) {
this.removeOffset = removeOffset;
}
}
......@@ -320,6 +320,13 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName);
}
@Override
public void deleteSubscriptionGroup(String addr,
String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName, removeOffset);
}
@Override
public void createAndUpdateKvConfig(String namespace, String key,
String value) throws RemotingException, MQBrokerException,
......
......@@ -424,7 +424,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
public void deleteSubscriptionGroup(String addr,
String groupName) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, timeoutMillis);
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, false, timeoutMillis);
}
@Override
public void deleteSubscriptionGroup(String addr,
String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, removeOffset, timeoutMillis);
}
@Override
......
......@@ -152,6 +152,9 @@ public interface MQAdminExt extends MQAdmin {
void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void deleteSubscriptionGroup(final String addr, String groupName, boolean removeOffset) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void createAndUpdateKvConfig(String namespace, String key,
String value) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
......
......@@ -54,6 +54,10 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
opt.setRequired(true);
options.addOption(opt);
opt = new Option("r", "removeOffset", true, "remove offset");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......@@ -65,11 +69,16 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
// groupName
String groupName = commandLine.getOptionValue('g').trim();
boolean removeOffset = false;
if (commandLine.hasOption('r')) {
removeOffset = Boolean.valueOf(commandLine.getOptionValue("r").trim());
}
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
adminExt.start();
adminExt.deleteSubscriptionGroup(addr, groupName);
adminExt.deleteSubscriptionGroup(addr, groupName, removeOffset);
System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName,
addr);
......@@ -80,7 +89,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
for (String master : masterSet) {
adminExt.deleteSubscriptionGroup(master, groupName);
adminExt.deleteSubscriptionGroup(master, groupName, removeOffset);
System.out.printf(
"delete subscription group [%s] from broker [%s] in cluster [%s] success.%n",
groupName, master, clusterName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册