AuthorityServiceImpl.java 8.0 KB
Newer Older
Z
zengqiao 已提交
1 2 3 4 5 6
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;

import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.OperationStatusEnum;
7 8 9
import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
Z
zengqiao 已提交
10 11 12 13
import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.KafkaAclDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
14
import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao;
Z
zengqiao 已提交
15 16 17 18 19 20 21 22 23
import com.xiaojukeji.kafka.manager.dao.gateway.KafkaAclDao;
import com.xiaojukeji.kafka.manager.service.service.OperateRecordService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

24 25
import java.util.*;
import java.util.stream.Collectors;
Z
zengqiao 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164

/**
 * @author zhongyuankai
 * @date 20/4/28
 */
@Service("authorityService")
public class AuthorityServiceImpl implements AuthorityService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AuthorityServiceImpl.class);

    @Autowired
    private AuthorityDao authorityDao;

    @Autowired
    private KafkaAclDao kafkaAclDao;

    @Autowired
    private QuotaService quotaService;

    @Autowired
    private OperateRecordService operateRecordService;

    @Override
    public int addAuthority(AuthorityDO authorityDO) {
        int result = 0;
        Integer newAccess = authorityDO.getAccess();
        try {
            // 权限只会增加, 不会减少, 这里做了新旧权限的merge
            AuthorityDO originAuthority = getAuthority(
                    authorityDO.getClusterId(),
                    authorityDO.getTopicName(),
                    authorityDO.getAppId()
            );
            if (!ValidateUtils.isNull(originAuthority)) {
                newAccess |= originAuthority.getAccess();
                authorityDO.setAccess(newAccess);
                if (newAccess.equals(originAuthority.getAccess())) {
                    // 新旧权限一致, 不需要做任何调整
                    return result;
                }
            }
            if (authorityDao.insert(authorityDO) < 1) {
                return result;
            }
            KafkaAclDO kafkaAclDO = new KafkaAclDO();
            kafkaAclDO.setTopicName(authorityDO.getTopicName());
            kafkaAclDO.setClusterId(authorityDO.getClusterId());
            kafkaAclDO.setAppId(authorityDO.getAppId());
            kafkaAclDO.setAccess(authorityDO.getAccess());
            kafkaAclDO.setOperation(OperationStatusEnum.CREATE.getCode());
            return kafkaAclDao.insert(kafkaAclDO);
        } catch (Exception e) {
            LOGGER.error("add authority failed, authorityDO:{}.", authorityDO, e);
        }
        return result;
    }

    @Override
    public ResultStatus deleteSpecifiedAccess(String appId, Long clusterId, String topicName, Integer access, String operator) {
        AuthorityDO authorityDO = getAuthority(clusterId, topicName, appId);
        if (ValidateUtils.isNull(authorityDO)) {
            return ResultStatus.AUTHORITY_NOT_EXIST;
        }

        if ((authorityDO.getAccess() & access) != access) {
            // 并不具备所要删除的权限, 返回错误
            return ResultStatus.PARAM_ILLEGAL;
        }

        int newAccess = authorityDO.getAccess() ^ access;
        authorityDO.setAccess(newAccess);
        try {
            if (authorityDao.insert(authorityDO) < 1) {
                return ResultStatus.OPERATION_FAILED;
            }

            // kafka_acl表, 删除权限时, 只需要存储所要删除的权限, 不需要存储权限的终态或者什么的
            KafkaAclDO kafkaAclDO = new KafkaAclDO();
            kafkaAclDO.setOperation(OperationStatusEnum.DELETE.getCode());
            kafkaAclDO.setAccess(access);
            kafkaAclDO.setAppId(appId);
            kafkaAclDO.setClusterId(clusterId);
            kafkaAclDO.setTopicName(topicName);
            if (kafkaAclDao.insert(kafkaAclDO) < 1) {
                return ResultStatus.OPERATION_FAILED;
            }

            // 记录操作
            Map<String, Object> content = new HashMap<>(4);
            content.put("clusterId", clusterId);
            content.put("topicName", topicName);
            content.put("access", access);
            content.put("appId", appId);
            OperateRecordDO operateRecordDO = new OperateRecordDO();
            operateRecordDO.setModuleId(ModuleEnum.AUTHORITY.getCode());
            operateRecordDO.setOperateId(OperateEnum.DELETE.getCode());
            operateRecordDO.setResource(topicName);
            operateRecordDO.setContent(JSONObject.toJSONString(content));
            operateRecordDO.setOperator(operator);
            operateRecordService.insert(operateRecordDO);
        } catch (Exception e) {
            LOGGER.error("delete authority failed, authorityDO:{}.", authorityDO, e);
        }
        return ResultStatus.SUCCESS;
    }

    @Override
    public AuthorityDO getAuthority(Long clusterId, String topicName, String appId) {
        List<AuthorityDO> authorityDOList = null;
        try {
            authorityDOList = authorityDao.getAuthority(clusterId, topicName, appId);
        } catch (Exception e) {
            LOGGER.error("get authority failed, clusterId:{}, topicName:{}, appId:{}.", clusterId, topicName, appId, e);
        }
        if (ValidateUtils.isEmptyList(authorityDOList)) {
            return null;
        }
        return authorityDOList.get(0);
    }

    @Override
    public List<AuthorityDO> getAuthorityByTopic(Long clusterId, String topicName) {
        try {
            return authorityDao.getAuthorityByTopic(clusterId, topicName);
        } catch (Exception e) {
            LOGGER.error("get authority failed, clusterId:{} topicName:{}.", clusterId, topicName, e);
        }
        return null;
    }

    @Override
    public List<AuthorityDO> getAuthority(String appId) {
        List<AuthorityDO> doList = null;
        try {
            doList = authorityDao.getByAppId(appId);
        } catch (Exception e) {
            LOGGER.error("get authority failed, appId:{}.", appId, e);
        }
        if (ValidateUtils.isEmptyList(doList)) {
            return new ArrayList<>();
165 166 167 168 169 170 171
        } else {
            assert doList != null;
            // 过滤权限列表中access=0的
            List<AuthorityDO> newList = doList.stream()
                    .filter(authorityDO -> !TopicAuthorityEnum.DENY.getCode().equals(authorityDO.getAccess()))
                    .collect(Collectors.toList());
            return newList;
Z
zengqiao 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
        }
    }

    @Override
    public List<AuthorityDO> listAll() {
        return authorityDao.listAll();
    }

    @Override
    public int addAuthorityAndQuota(AuthorityDO authorityDO, TopicQuota topicQuotaDO) {
        int result = 0;
        try {
            result = addAuthority(authorityDO);
            if (result < 1) {
                return result;
            }
            return quotaService.addTopicQuota(topicQuotaDO, authorityDO.getAccess());
        } catch (Exception e) {
            LOGGER.error("add authority and quota failed, authorityDO:{} topicQuotaDO:{}.",
                    authorityDO, topicQuotaDO, e);
            return result;
        }
    }

    @Override
    public Map<String, Map<Long, Map<String, AuthorityDO>>> getAllAuthority() {
        return authorityDao.getAllAuthority();
    }
Z
zengqiao 已提交
200 201 202 203 204 205

    @Override
    public int deleteAuthorityByTopic(Long clusterId, String topicName) {
        return authorityDao.deleteAuthorityByTopic(clusterId, topicName);
    }

Z
zengqiao 已提交
206
}