KafkaAdminClient.java 5.8 KB
Newer Older
Z
zengqiao 已提交
1 2
package com.xiaojukeji.know.streaming.km.persistence.kafka;

3 4
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
Z
zengqiao 已提交
5 6 7 8 9 10 11
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.persistence.AbstractClusterLoadedChangedHandler;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
12
import org.springframework.beans.factory.annotation.Value;
Z
zengqiao 已提交
13 14
import org.springframework.stereotype.Component;

15 16
import java.util.ArrayList;
import java.util.List;
Z
zengqiao 已提交
17 18 19 20 21 22
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class KafkaAdminClient extends AbstractClusterLoadedChangedHandler {
23 24 25 26 27 28
    private static final ILog LOGGER = LogFactory.getLog(KafkaAdminClient.class);

    @Value("${client-pool.kafka-admin.client-cnt:1}")
    private Integer clientCnt;

    private static final Map<Long, List<AdminClient>> KAFKA_ADMIN_CLIENT_MAP = new ConcurrentHashMap<>();
Z
zengqiao 已提交
29 30

    public AdminClient getClient(Long clusterPhyId) throws NotExistException {
31 32 33
        List<AdminClient> adminClientList = KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId);
        if (adminClientList != null) {
            return adminClientList.get((int)(System.currentTimeMillis() % clientCnt));
Z
zengqiao 已提交
34 35
        }

36
        AdminClient adminClient = this.createKafkaAdminClient(clusterPhyId);
Z
zengqiao 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
        if (adminClient == null) {
            throw new NotExistException("kafka admin-client not exist due to create failed");
        }

        return adminClient;
    }

    /**************************************************** private method ****************************************************/

    @Override
    protected void add(ClusterPhy clusterPhy) {
        // ignore 后续按需创建,因此这里的操作直接忽略
    }

    @Override
    protected void modify(ClusterPhy newClusterPhy, ClusterPhy oldClusterPhy) {
        if (newClusterPhy.getBootstrapServers().equals(oldClusterPhy.getBootstrapServers())
                && newClusterPhy.getClientProperties().equals(oldClusterPhy.getClientProperties())) {
            // 集群信息虽然变化,但是服务地址和client配置没有变化,则直接返回
            return;
        }

        // 去除历史的,新的继续按需创建
        this.remove(newClusterPhy);
    }

    @Override
    protected void remove(ClusterPhy clusterPhy) {
        this.closeKafkaAdminClient(clusterPhy.getId());
    }

    private void closeKafkaAdminClient(Long clusterPhyId) {
        try {
            modifyClientMapLock.lock();

72 73
            List<AdminClient> adminClientList = KAFKA_ADMIN_CLIENT_MAP.remove(clusterPhyId);
            if (adminClientList == null) {
Z
zengqiao 已提交
74 75 76
                return;
            }

77
            LOGGER.info("close kafka AdminClient starting, clusterPhyId:{}", clusterPhyId);
Z
zengqiao 已提交
78

79
            boolean allSuccess = this.closeAdminClientList(adminClientList);
Z
zengqiao 已提交
80

81 82 83
            if (allSuccess) {
                LOGGER.info("close kafka AdminClient success, clusterPhyId:{}", clusterPhyId);
            }
Z
zengqiao 已提交
84
        } catch (Exception e) {
85
            LOGGER.error("close kafka AdminClient failed, clusterPhyId:{}", clusterPhyId, e);
Z
zengqiao 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
        } finally {
            modifyClientMapLock.unlock();
        }
    }

    private AdminClient createKafkaAdminClient(Long clusterPhyId) throws NotExistException {
        ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
        if (clusterPhy == null) {
            throw new NotExistException(String.format("clusterPhyId:%d not exist", clusterPhyId));
        }

        return this.createKafkaAdminClient(clusterPhyId, clusterPhy.getBootstrapServers(), ConvertUtil.str2ObjByJson(clusterPhy.getClientProperties(), Properties.class));
    }

    private AdminClient createKafkaAdminClient(Long clusterPhyId, String bootstrapServers, Properties props) {
101
        List<AdminClient> adminClientList = null;
Z
zengqiao 已提交
102 103 104
        try {
            modifyClientMapLock.lock();

105 106 107
            adminClientList = KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId);
            if (adminClientList != null) {
                return adminClientList.get((int)(System.currentTimeMillis() % clientCnt));
Z
zengqiao 已提交
108 109
            }

110
            LOGGER.debug("create kafka AdminClient starting, clusterPhyId:{} props:{}", clusterPhyId, props);
Z
zengqiao 已提交
111 112 113 114 115 116

            if (props == null) {
                props = new Properties();
            }
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

117 118 119 120 121 122 123 124
            adminClientList = new ArrayList<>();
            for (int i = 0; i < clientCnt; ++i) {
                adminClientList.add(AdminClient.create(props));
            }

            KAFKA_ADMIN_CLIENT_MAP.put(clusterPhyId, adminClientList);

            LOGGER.info("create kafka AdminClient success, clusterPhyId:{}", clusterPhyId);
Z
zengqiao 已提交
125
        } catch (Exception e) {
126 127 128
            LOGGER.error("create kafka AdminClient failed, clusterPhyId:{} props:{}", clusterPhyId, props, e);

            this.closeAdminClientList(adminClientList);
Z
zengqiao 已提交
129 130 131 132
        } finally {
            modifyClientMapLock.unlock();
        }

133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
        return KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId).get((int)(System.currentTimeMillis() % clientCnt));
    }

    private boolean closeAdminClientList(List<AdminClient> adminClientList) {
        if (adminClientList == null) {
            return true;
        }

        boolean allSuccess = true;
        for (AdminClient adminClient: adminClientList) {
            try {
                adminClient.close();
            } catch (Exception e) {
                // ignore
                allSuccess = false;
            }
        }

        return allSuccess;
Z
zengqiao 已提交
152 153
    }
}