diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java index 21511a96bea1d4ddc5ae43b51800c051311a4fa1..b0a7d7b5657d2d1aad0cc1a04bfa766d4b77feb2 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java @@ -78,6 +78,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements } props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers()); + props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d", clusterPhy.getId())); adminClient = KSPartialKafkaAdminClient.create(props); KSListGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups( @@ -178,6 +179,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements } props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers()); + props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d", clusterPhy.getId())); adminClient = KSPartialKafkaAdminClient.create(props); diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java index 20447798c17998b4f0a50f916f5a95c81bc16102..fccf35dd40e43ef33d88b23fd3309c99cb91c24a 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java @@ -12,6 +12,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -76,10 +77,12 @@ public class KafkaAdminClient extends AbstractClusterLoadedChangedHandler { LOGGER.info("close kafka AdminClient starting, clusterPhyId:{}", clusterPhyId); - boolean allSuccess = this.closeAdminClientList(adminClientList); + boolean allSuccess = this.closeAdminClientList(clusterPhyId, adminClientList); if (allSuccess) { LOGGER.info("close kafka AdminClient success, clusterPhyId:{}", clusterPhyId); + } else { + LOGGER.error("close kafka AdminClient exist failed and can ignore this error, clusterPhyId:{}", clusterPhyId); } } catch (Exception e) { LOGGER.error("close kafka AdminClient failed, clusterPhyId:{}", clusterPhyId, e); @@ -116,6 +119,7 @@ public class KafkaAdminClient extends AbstractClusterLoadedChangedHandler { adminClientList = new ArrayList<>(); for (int i = 0; i < clientCnt; ++i) { + props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("ApacheAdminClient||clusterPhyId=%d||Cnt=%d", clusterPhyId, i)); adminClientList.add(AdminClient.create(props)); } @@ -125,7 +129,7 @@ public class KafkaAdminClient extends AbstractClusterLoadedChangedHandler { } catch (Exception e) { LOGGER.error("create kafka AdminClient failed, clusterPhyId:{} props:{}", clusterPhyId, props, e); - this.closeAdminClientList(adminClientList); + this.closeAdminClientList(clusterPhyId, adminClientList); } finally { modifyClientMapLock.unlock(); } @@ -133,7 +137,7 @@ public class KafkaAdminClient extends AbstractClusterLoadedChangedHandler { return KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId).get((int)(System.currentTimeMillis() % clientCnt)); } - private boolean closeAdminClientList(List adminClientList) { + private boolean closeAdminClientList(Long clusterPhyId, List adminClientList) { if (adminClientList == null) { return true; } @@ -141,9 +145,11 @@ public class KafkaAdminClient extends AbstractClusterLoadedChangedHandler { boolean allSuccess = true; for (AdminClient adminClient: adminClientList) { try { - adminClient.close(); + // 关闭客户端,超时时间为30秒 + adminClient.close(Duration.ofSeconds(30)); } catch (Exception e) { // ignore + LOGGER.error("close kafka AdminClient exist failed, clusterPhyId:{}", clusterPhyId, e); allSuccess = false; } }