TopicDaoImpl.java 4.2 KB
Newer Older
Z
zengqiao 已提交
1 2 3 4
package com.xiaojukeji.kafka.manager.dao.impl;

import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.dao.TopicDao;
Z
zengqiao 已提交
5
import com.xiaojukeji.kafka.manager.task.Constant;
Z
zengqiao 已提交
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author zengqiao
 * @date 19/7/12
 */
@Repository("TopicDao")
public class TopicDaoImpl implements TopicDao {
    /**
     * Topic最近的一次更新时间, 更新之后的缓存
     */
Z
zengqiao 已提交
22 23
    private static volatile long TOPIC_CACHE_LATEST_UPDATE_TIME = Constant.START_TIMESTAMP;

Z
zengqiao 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
    private static final Map<Long, Map<String, TopicDO>> TOPIC_MAP = new ConcurrentHashMap<>();

    @Autowired
    private SqlSessionTemplate sqlSession;

    public void setSqlSession(SqlSessionTemplate sqlSession) {
        this.sqlSession = sqlSession;
    }

    @Override
    public int insert(TopicDO topicDO) {
        return sqlSession.insert("TopicDao.insert", topicDO);
    }

    @Override
    public int deleteById(Long id) {
        return sqlSession.delete("TopicDao.deleteById", id);
    }

    @Override
    public int deleteByName(Long clusterId, String topicName) {
        Map<String, Object> params = new HashMap<>(2);
        params.put("clusterId", clusterId);
        params.put("topicName", topicName);
        return sqlSession.delete("TopicDao.deleteByName", params);
    }

    @Override
    public int updateByName(TopicDO topicDO) {
        return sqlSession.update("TopicDao.updateByName", topicDO);
    }

    @Override
    public TopicDO getByTopicName(Long clusterId, String topicName) {
        Map<String, Object> params = new HashMap<>(2);
        params.put("clusterId", clusterId);
        params.put("topicName", topicName);
        return sqlSession.selectOne("TopicDao.getByTopicName", params);
    }

    @Override
    public List<TopicDO> getByClusterId(Long clusterId) {
        updateTopicCache();
Z
zengqiao 已提交
67
        return new ArrayList<>(TOPIC_MAP.getOrDefault(clusterId, Collections.emptyMap()).values());
Z
zengqiao 已提交
68 69 70 71 72 73 74 75 76 77 78 79
    }

    @Override
    public List<TopicDO> getByAppId(String appId) {
        return sqlSession.selectList("TopicDao.getByAppId", appId);
    }

    @Override
    public List<TopicDO> listAll() {
        updateTopicCache();
        List<TopicDO> doList = new ArrayList<>();
        for (Long clusterId: TOPIC_MAP.keySet()) {
Z
zengqiao 已提交
80
            doList.addAll(TOPIC_MAP.getOrDefault(clusterId, Collections.emptyMap()).values());
Z
zengqiao 已提交
81 82 83 84 85 86
        }
        return doList;
    }

    @Override
    public TopicDO getTopic(Long clusterId, String topicName, String appId) {
Z
zengqiao 已提交
87
        Map<String, Object> params = new HashMap<>(3);
Z
zengqiao 已提交
88 89 90 91 92 93 94
        params.put("clusterId", clusterId);
        params.put("topicName", topicName);
        params.put("appId", appId);
        return sqlSession.selectOne("TopicDao.getTopic", params);
    }

    private void updateTopicCache() {
95
        long timestamp = System.currentTimeMillis();
Z
zengqiao 已提交
96

Z
zengqiao 已提交
97 98 99 100 101
        if (timestamp + 1000 <= TOPIC_CACHE_LATEST_UPDATE_TIME) {
            // 近一秒内的请求不走db
            return;
        }

Z
zengqiao 已提交
102 103 104 105 106 107 108 109
        Date afterTime = new Date(TOPIC_CACHE_LATEST_UPDATE_TIME);
        List<TopicDO> doList = sqlSession.selectList("TopicDao.listAfterTime", afterTime);
        updateTopicCache(doList, timestamp);
    }

    /**
     * 更新Topic缓存
     */
Z
zengqiao 已提交
110
    private synchronized void updateTopicCache(List<TopicDO> doList, Long timestamp) {
111 112 113 114
        if (TOPIC_CACHE_LATEST_UPDATE_TIME == Constant.START_TIMESTAMP) {
            TOPIC_MAP.clear();
        }

Z
zengqiao 已提交
115 116 117 118
        if (doList == null || doList.isEmpty() || TOPIC_CACHE_LATEST_UPDATE_TIME >= timestamp) {
            // 本次无数据更新, 或者本次更新过时 时, 忽略本次更新
            return;
        }
Z
zengqiao 已提交
119

Z
zengqiao 已提交
120 121 122 123 124 125 126
        for (TopicDO elem: doList) {
            Map<String, TopicDO> doMap = TOPIC_MAP.getOrDefault(elem.getClusterId(), new ConcurrentHashMap<>());
            doMap.put(elem.getTopicName(), elem);
            TOPIC_MAP.put(elem.getClusterId(), doMap);
        }
        TOPIC_CACHE_LATEST_UPDATE_TIME = timestamp;
    }
Z
zengqiao 已提交
127 128 129 130

    public static void resetCache() {
        TOPIC_CACHE_LATEST_UPDATE_TIME = Constant.START_TIMESTAMP;
    }
Z
zengqiao 已提交
131
}