未验证 提交 a7309612 编写于 作者: E EricZeng 提交者: GitHub

[Optimize]统一DB元信息更新格式-Part2 (#1127)

1、KafkaMetaService修改为MetaService并移动到Core层;
2、修改ZK、KafkaACL的格式;
上级 6e56688a
package com.xiaojukeji.know.streaming.km.core.service.acl; package com.xiaojukeji.know.streaming.km.core.service.acl;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.resource.ResourceType;
import java.util.List; import java.util.List;
public interface KafkaAclService { public interface KafkaAclService extends MetaDataService<AclBinding> {
Result<List<AclBinding>> getAclFromKafka(Long clusterPhyId);
List<KafkaAclPO> getKafkaAclFromDB(Long clusterPhyId); List<KafkaAclPO> getKafkaAclFromDB(Long clusterPhyId);
Integer countKafkaAclFromDB(Long clusterPhyId); Integer countKafkaAclFromDB(Long clusterPhyId);
...@@ -17,10 +15,5 @@ public interface KafkaAclService { ...@@ -17,10 +15,5 @@ public interface KafkaAclService {
Integer countResTypeAndDistinctFromDB(Long clusterPhyId, ResourceType resourceType); Integer countResTypeAndDistinctFromDB(Long clusterPhyId, ResourceType resourceType);
Integer countKafkaUserAndDistinctFromDB(Long clusterPhyId); Integer countKafkaUserAndDistinctFromDB(Long clusterPhyId);
List<KafkaAclPO> getKafkaResTypeAclFromDB(Long clusterPhyId, Integer resType);
List<KafkaAclPO> getTopicAclFromDB(Long clusterPhyId, String topicName); List<KafkaAclPO> getTopicAclFromDB(Long clusterPhyId, String topicName);
List<KafkaAclPO> getGroupAclFromDB(Long clusterPhyId, String groupName);
} }
...@@ -3,10 +3,6 @@ package com.xiaojukeji.know.streaming.km.core.service.acl; ...@@ -3,10 +3,6 @@ package com.xiaojukeji.know.streaming.km.core.service.acl;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.acl.ACLAtomParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.acl.ACLAtomParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import org.apache.kafka.common.resource.ResourceType;
import java.util.Date;
import java.util.List;
public interface OpKafkaAclService { public interface OpKafkaAclService {
/** /**
...@@ -19,14 +15,5 @@ public interface OpKafkaAclService { ...@@ -19,14 +15,5 @@ public interface OpKafkaAclService {
*/ */
Result<Void> deleteKafkaAcl(ACLAtomParam aclAtomParam, String operator); Result<Void> deleteKafkaAcl(ACLAtomParam aclAtomParam, String operator);
/**
* 删除ACL
*/
Result<Void> deleteKafkaAclByResName(ResourceType resourceType, String resourceName, String operator);
Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO); Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO);
void batchUpdateAcls(Long clusterPhyId, List<KafkaAclPO> poList);
int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime);
} }
...@@ -11,6 +11,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; ...@@ -11,6 +11,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter;
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
...@@ -18,8 +19,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; ...@@ -18,8 +19,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO; import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO;
...@@ -36,11 +35,13 @@ import org.apache.kafka.common.resource.ResourceType; ...@@ -36,11 +35,13 @@ import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SecurityUtils; import org.apache.kafka.common.utils.SecurityUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.ArrayList; import java.util.*;
import java.util.List; import java.util.function.Function;
import java.util.stream.Collectors;
import scala.jdk.javaapi.CollectionConverters; import scala.jdk.javaapi.CollectionConverters;
...@@ -77,18 +78,49 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen ...@@ -77,18 +78,49 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
} }
@Override @Override
public Result<List<AclBinding>> getAclFromKafka(Long clusterPhyId) { public Result<List<AclBinding>> getDataFromKafka(ClusterPhy clusterPhy) {
if (LoadedClusterPhyCache.getByPhyId(clusterPhyId) == null) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
}
try { try {
return (Result<List<AclBinding>>) versionControlService.doHandler(getVersionItemType(), getMethodName(clusterPhyId, ACL_GET_FROM_KAFKA), new ClusterPhyParam(clusterPhyId)); Result<List<AclBinding>> dataResult = (Result<List<AclBinding>>) versionControlService.doHandler(getVersionItemType(), getMethodName(clusterPhy.getId(), ACL_GET_FROM_KAFKA), new ClusterPhyParam(clusterPhy.getId()));
if (dataResult.failed()) {
Result.buildFromIgnoreData(dataResult);
}
return Result.buildSuc(dataResult.getData());
} catch (VCHandlerNotExistException e) { } catch (VCHandlerNotExistException e) {
return Result.buildFailure(e.getResultStatus()); return Result.buildFailure(e.getResultStatus());
} }
} }
@Override
public void writeToDB(Long clusterPhyId, List<AclBinding> dataList) {
Map<String, KafkaAclPO> dbPOMap = this.getKafkaAclFromDB(clusterPhyId).stream().collect(Collectors.toMap(KafkaAclPO::getUniqueField, Function.identity()));
long now = System.currentTimeMillis();
for (AclBinding aclBinding: dataList) {
KafkaAclPO newPO = KafkaAclConverter.convert2KafkaAclPO(clusterPhyId, aclBinding, now);
KafkaAclPO oldPO = dbPOMap.remove(newPO.getUniqueField());
if (oldPO == null) {
// 新增的ACL
this.insertAndIgnoreDuplicate(newPO);
}
// 不需要update
}
// 删除已经不存在的
for (KafkaAclPO dbPO: dbPOMap.values()) {
kafkaAclDAO.deleteById(dbPO);
}
}
@Override
public int deleteInDBByKafkaClusterId(Long clusterPhyId) {
LambdaQueryWrapper<KafkaAclPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
return kafkaAclDAO.delete(lambdaQueryWrapper);
}
@Override @Override
public List<KafkaAclPO> getKafkaAclFromDB(Long clusterPhyId) { public List<KafkaAclPO> getKafkaAclFromDB(Long clusterPhyId) {
LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>();
...@@ -116,7 +148,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen ...@@ -116,7 +148,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
return 0; return 0;
} }
return (int)poList.stream().map(elem -> elem.getResourceName()).distinct().count(); return (int)poList.stream().map(KafkaAclPO::getResourceName).distinct().count();
} }
@Override @Override
...@@ -130,15 +162,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen ...@@ -130,15 +162,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
return 0; return 0;
} }
return (int)poList.stream().map(elem -> elem.getPrincipal()).distinct().count(); return (int)poList.stream().map(KafkaAclPO::getPrincipal).distinct().count();
}
@Override
public List<KafkaAclPO> getKafkaResTypeAclFromDB(Long clusterPhyId, Integer resType) {
LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
queryWrapper.eq(KafkaAclPO::getResourceType, resType);
return kafkaAclDAO.selectList(queryWrapper);
} }
@Override @Override
...@@ -152,15 +176,6 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen ...@@ -152,15 +176,6 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
return kafkaAclDAO.selectList(queryWrapper); return kafkaAclDAO.selectList(queryWrapper);
} }
@Override
public List<KafkaAclPO> getGroupAclFromDB(Long clusterPhyId, String groupName) {
LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
queryWrapper.eq(KafkaAclPO::getResourceType, ResourceType.GROUP.code());
queryWrapper.eq(KafkaAclPO::getResourceName, groupName);
return kafkaAclDAO.selectList(queryWrapper);
}
/**************************************************** private method ****************************************************/ /**************************************************** private method ****************************************************/
private Result<List<AclBinding>> getAclByZKClient(VersionItemParam itemParam){ private Result<List<AclBinding>> getAclByZKClient(VersionItemParam itemParam){
...@@ -170,7 +185,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen ...@@ -170,7 +185,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
for (ZkAclStore store: CollectionConverters.asJava(ZkAclStore.stores())) { for (ZkAclStore store: CollectionConverters.asJava(ZkAclStore.stores())) {
Result<List<AclBinding>> rl = this.getSpecifiedTypeAclByZKClient(param.getClusterPhyId(), store.patternType()); Result<List<AclBinding>> rl = this.getSpecifiedTypeAclByZKClient(param.getClusterPhyId(), store.patternType());
if (rl.failed()) { if (rl.failed()) {
return rl; return Result.buildFromIgnoreData(rl);
} }
aclList.addAll(rl.getData()); aclList.addAll(rl.getData());
...@@ -229,4 +244,19 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen ...@@ -229,4 +244,19 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
return Result.buildSuc(kafkaAclList); return Result.buildSuc(kafkaAclList);
} }
private Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) {
try {
kafkaAclDAO.insert(kafkaAclPO);
return Result.buildSuc();
} catch (DuplicateKeyException dke) {
// 直接写入,如果出现key冲突则直接忽略,因为key冲突时,表示该数据已完整存在,不需要替换任何数据
return Result.buildSuc();
} catch (Exception e) {
log.error("method=insertAndIgnoreDuplicate||kafkaAclPO={}||errMsg=exception", kafkaAclPO, e);
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage());
}
}
} }
...@@ -20,7 +20,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistExcept ...@@ -20,7 +20,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistExcept
import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService; import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO; import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO;
...@@ -32,7 +31,6 @@ import org.apache.kafka.clients.admin.*; ...@@ -32,7 +31,6 @@ import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.acl.*; import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException; import org.springframework.dao.DuplicateKeyException;
...@@ -41,8 +39,6 @@ import scala.jdk.javaapi.CollectionConverters; ...@@ -41,8 +39,6 @@ import scala.jdk.javaapi.CollectionConverters;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.*; import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
...@@ -169,11 +165,6 @@ public class OpKafkaAclServiceImpl extends BaseKafkaVersionControlService implem ...@@ -169,11 +165,6 @@ public class OpKafkaAclServiceImpl extends BaseKafkaVersionControlService implem
return rv; return rv;
} }
@Override
public Result<Void> deleteKafkaAclByResName(ResourceType resourceType, String resourceName, String operator) {
return Result.buildSuc();
}
@Override @Override
public Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) { public Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) {
try { try {
...@@ -190,34 +181,6 @@ public class OpKafkaAclServiceImpl extends BaseKafkaVersionControlService implem ...@@ -190,34 +181,6 @@ public class OpKafkaAclServiceImpl extends BaseKafkaVersionControlService implem
} }
} }
@Override
public void batchUpdateAcls(Long clusterPhyId, List<KafkaAclPO> poList) {
LambdaQueryWrapper<KafkaAclPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
Map<String, KafkaAclPO> dbPOMap = kafkaAclDAO.selectList(lambdaQueryWrapper).stream().collect(Collectors.toMap(KafkaAclPO::getUniqueField, Function.identity()));
for (KafkaAclPO po: poList) {
KafkaAclPO dbPO = dbPOMap.remove(po.getUniqueField());
if (dbPO == null) {
// 新增的ACL
this.insertAndIgnoreDuplicate(po);
}
}
// 删除已经不存在的
for (KafkaAclPO dbPO: dbPOMap.values()) {
kafkaAclDAO.deleteById(dbPO);
}
}
@Override
public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) {
LambdaQueryWrapper<KafkaAclPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.le(KafkaAclPO::getUpdateTime, beforeTime);
return kafkaAclDAO.delete(lambdaQueryWrapper);
}
/**************************************************** private method ****************************************************/ /**************************************************** private method ****************************************************/
private Result<Void> deleteInDB(KafkaAclPO kafkaAclPO) { private Result<Void> deleteInDB(KafkaAclPO kafkaAclPO) {
......
...@@ -4,7 +4,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluste ...@@ -4,7 +4,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluste
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.meta.KafkaMetaService; import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum;
...@@ -14,7 +14,7 @@ import java.util.List; ...@@ -14,7 +14,7 @@ import java.util.List;
/** /**
* 查看Connector * 查看Connector
*/ */
public interface ConnectorService extends KafkaMetaService<KSConnector> { public interface ConnectorService extends MetaDataService<KSConnector> {
/** /**
* 获取所有的连接器名称列表 * 获取所有的连接器名称列表
*/ */
......
package com.xiaojukeji.know.streaming.km.common.bean.entity.meta; package com.xiaojukeji.know.streaming.km.core.service.meta;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
...@@ -13,7 +13,7 @@ import java.util.Set; ...@@ -13,7 +13,7 @@ import java.util.Set;
/** /**
* Kafka元信息服务接口 * Kafka元信息服务接口
*/ */
public interface KafkaMetaService<T> { public interface MetaDataService<T> {
/** /**
* 从Kafka中获取数据 * 从Kafka中获取数据
* @param connectCluster connect集群 * @param connectCluster connect集群
...@@ -26,19 +26,26 @@ public interface KafkaMetaService<T> { ...@@ -26,19 +26,26 @@ public interface KafkaMetaService<T> {
* @param clusterPhy kafka集群 * @param clusterPhy kafka集群
* @return 全部资源集合, 成功的资源列表 * @return 全部资源集合, 成功的资源列表
*/ */
default Result<Tuple<Set<String>, List<T>>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); } default Result<List<T>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new ArrayList<>()); }
/** /**
* 元信息同步至DB中 * 元信息同步至DB中
* @param clusterId 集群ID * @param clusterId 集群ID
* @param fullNameSet 全部资源列表 * @param fullResSet 全部资源列表
* @param dataList 成功的资源列表 * @param dataList 成功的资源列表
*/ */
default void writeToDB(Long clusterId, Set<String> fullNameSet, List<T> dataList) {} default void writeToDB(Long clusterId, Set<String> fullResSet, List<T> dataList) {}
/**
* 元信息同步至DB中
* @param clusterId 集群ID
* @param dataList 成功的资源列表
*/
default void writeToDB(Long clusterId, List<T> dataList) {}
/** /**
* 依据kafka集群ID删除数据 * 依据kafka集群ID删除数据
* @param clusterPhyId kafka集群ID * @param clusterPhyId kafka集群ID
*/ */
default int deleteInDBByKafkaClusterId(Long clusterPhyId) { return 0; } int deleteInDBByKafkaClusterId(Long clusterPhyId);
} }
package com.xiaojukeji.know.streaming.km.core.service.zookeeper; package com.xiaojukeji.know.streaming.km.core.service.zookeeper;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
import java.util.List; import java.util.List;
public interface ZookeeperService { public interface ZookeeperService extends MetaDataService<ZookeeperInfo> {
/**
* 从ZK集群中获取ZK信息
*/
Result<List<ZookeeperInfo>> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig);
void batchReplaceDataInDB(Long clusterPhyId, List<ZookeeperInfo> infoList);
List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId); List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId);
/** /**
......
...@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl; ...@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
...@@ -22,10 +23,8 @@ import com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper.ZookeeperDAO ...@@ -22,10 +23,8 @@ import com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper.ZookeeperDAO
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap; import java.util.stream.Collectors;
import java.util.List;
import java.util.Map;
@Service @Service
public class ZookeeperServiceImpl implements ZookeeperService { public class ZookeeperServiceImpl implements ZookeeperService {
...@@ -35,14 +34,14 @@ public class ZookeeperServiceImpl implements ZookeeperService { ...@@ -35,14 +34,14 @@ public class ZookeeperServiceImpl implements ZookeeperService {
private ZookeeperDAO zookeeperDAO; private ZookeeperDAO zookeeperDAO;
@Override @Override
public Result<List<ZookeeperInfo>> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig) { public Result<List<ZookeeperInfo>> getDataFromKafka(ClusterPhy clusterPhy) {
List<Tuple<String, Integer>> addressList = null; List<Tuple<String, Integer>> addressList = null;
try { try {
addressList = ZookeeperUtils.connectStringParser(zookeeperAddress); addressList = ZookeeperUtils.connectStringParser(clusterPhy.getZookeeper());
} catch (Exception e) { } catch (Exception e) {
LOGGER.error( LOGGER.error(
"method=listFromZookeeperCluster||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!", "method=getDataFromKafka||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!",
clusterPhyId, zookeeperAddress, e clusterPhy.getId(), clusterPhy.getZookeeper(), e
); );
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, e.getMessage()); return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, e.getMessage());
...@@ -51,24 +50,25 @@ public class ZookeeperServiceImpl implements ZookeeperService { ...@@ -51,24 +50,25 @@ public class ZookeeperServiceImpl implements ZookeeperService {
List<ZookeeperInfo> aliveZKList = new ArrayList<>(); List<ZookeeperInfo> aliveZKList = new ArrayList<>();
for (Tuple<String, Integer> hostPort: addressList) { for (Tuple<String, Integer> hostPort: addressList) {
aliveZKList.add(this.getFromZookeeperCluster( aliveZKList.add(this.getFromZookeeperCluster(
clusterPhyId, clusterPhy.getId(),
hostPort.getV1(), hostPort.getV1(),
hostPort.getV2(), hostPort.getV2(),
zkConfig ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class)
)); ));
} }
return Result.buildSuc(aliveZKList); return Result.buildSuc(aliveZKList);
} }
@Override @Override
public void batchReplaceDataInDB(Long clusterPhyId, List<ZookeeperInfo> infoList) { public void writeToDB(Long clusterId, List<ZookeeperInfo> dataList) {
// DB 中的信息 // DB 中的信息
List<ZookeeperInfoPO> dbInfoList = this.listRawFromDBByCluster(clusterPhyId); Map<String, ZookeeperInfoPO> dbMap = this.listRawFromDBByCluster(clusterId)
Map<String, ZookeeperInfoPO> dbMap = new HashMap<>(); .stream()
dbInfoList.stream().forEach(elem -> dbMap.put(elem.getHost() + elem.getPort(), elem)); .collect(Collectors.toMap(elem -> elem.getHost() + elem.getPort(), elem -> elem, (oldValue, newValue) -> newValue));
// 新获取到的信息 // 新获取到的信息
List<ZookeeperInfoPO> newInfoList = ConvertUtil.list2List(infoList, ZookeeperInfoPO.class); List<ZookeeperInfoPO> newInfoList = ConvertUtil.list2List(dataList, ZookeeperInfoPO.class);
for (ZookeeperInfoPO newInfo: newInfoList) { for (ZookeeperInfoPO newInfo: newInfoList) {
try { try {
ZookeeperInfoPO oldInfo = dbMap.remove(newInfo.getHost() + newInfo.getPort()); ZookeeperInfoPO oldInfo = dbMap.remove(newInfo.getHost() + newInfo.getPort());
...@@ -87,7 +87,7 @@ public class ZookeeperServiceImpl implements ZookeeperService { ...@@ -87,7 +87,7 @@ public class ZookeeperServiceImpl implements ZookeeperService {
zookeeperDAO.updateById(newInfo); zookeeperDAO.updateById(newInfo);
} }
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("method=batchReplaceDataInDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterPhyId, newInfo, e); LOGGER.error("method=writeToDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterId, newInfo, e);
} }
} }
...@@ -96,11 +96,19 @@ public class ZookeeperServiceImpl implements ZookeeperService { ...@@ -96,11 +96,19 @@ public class ZookeeperServiceImpl implements ZookeeperService {
try { try {
zookeeperDAO.deleteById(entry.getValue().getId()); zookeeperDAO.deleteById(entry.getValue().getId());
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("method=batchReplaceDataInDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterPhyId, entry.getValue(), e); LOGGER.error("method=writeToDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterId, entry.getValue(), e);
} }
}); });
} }
@Override
public int deleteInDBByKafkaClusterId(Long clusterPhyId) {
LambdaQueryWrapper<ZookeeperInfoPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(ZookeeperInfoPO::getClusterPhyId, clusterPhyId);
return zookeeperDAO.delete(lambdaQueryWrapper);
}
@Override @Override
public List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId) { public List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId) {
return ConvertUtil.list2List(this.listRawFromDBByCluster(clusterPhyId), ZookeeperInfo.class); return ConvertUtil.list2List(this.listRawFromDBByCluster(clusterPhyId), ZookeeperInfo.class);
......
...@@ -3,19 +3,13 @@ package com.xiaojukeji.know.streaming.km.task.kafka.metadata; ...@@ -3,19 +3,13 @@ package com.xiaojukeji.know.streaming.km.task.kafka.metadata;
import com.didiglobal.logi.job.annotation.Task; import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum; import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter;
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService;
import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBinding;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
@Task(name = "SyncKafkaAclTask", @Task(name = "SyncKafkaAclTask",
description = "KafkaAcl信息同步到DB", description = "KafkaAcl信息同步到DB",
...@@ -24,32 +18,18 @@ import java.util.stream.Collectors; ...@@ -24,32 +18,18 @@ import java.util.stream.Collectors;
consensual = ConsensualEnum.BROADCAST, consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60) timeout = 2 * 60)
public class SyncKafkaAclTask extends AbstractAsyncMetadataDispatchTask { public class SyncKafkaAclTask extends AbstractAsyncMetadataDispatchTask {
private static final ILog log = LogFactory.getLog(SyncKafkaAclTask.class);
@Autowired @Autowired
private KafkaAclService kafkaAclService; private KafkaAclService kafkaAclService;
@Autowired
private OpKafkaAclService opKafkaAclService;
@Override @Override
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
Result<List<AclBinding>> aclBindingListResult = kafkaAclService.getAclFromKafka(clusterPhy.getId()); Result<List<AclBinding>> aclBindingListResult = kafkaAclService.getDataFromKafka(clusterPhy);
if (aclBindingListResult.failed()) { if (aclBindingListResult.failed()) {
return TaskResult.FAIL; return TaskResult.FAIL;
} }
if (!aclBindingListResult.hasData()) { kafkaAclService.writeToDB(clusterPhy.getId(), aclBindingListResult.getData());
return TaskResult.SUCCESS;
}
// 更新DB数据
List<KafkaAclPO> poList = aclBindingListResult.getData()
.stream()
.map(elem -> KafkaAclConverter.convert2KafkaAclPO(clusterPhy.getId(), elem, triggerTimeUnitMs))
.collect(Collectors.toList());
opKafkaAclService.batchUpdateAcls(clusterPhy.getId(), poList);
return TaskResult.SUCCESS; return TaskResult.SUCCESS;
} }
} }
...@@ -3,12 +3,8 @@ package com.xiaojukeji.know.streaming.km.task.kafka.metadata; ...@@ -3,12 +3,8 @@ package com.xiaojukeji.know.streaming.km.task.kafka.metadata;
import com.didiglobal.logi.job.annotation.Task; import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum; import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -23,24 +19,17 @@ import java.util.List; ...@@ -23,24 +19,17 @@ import java.util.List;
consensual = ConsensualEnum.BROADCAST, consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60) timeout = 2 * 60)
public class SyncZookeeperTask extends AbstractAsyncMetadataDispatchTask { public class SyncZookeeperTask extends AbstractAsyncMetadataDispatchTask {
private static final ILog log = LogFactory.getLog(SyncZookeeperTask.class);
@Autowired @Autowired
private ZookeeperService zookeeperService; private ZookeeperService zookeeperService;
@Override @Override
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
Result<List<ZookeeperInfo>> infoResult = zookeeperService.listFromZookeeper( Result<List<ZookeeperInfo>> infoResult = zookeeperService.getDataFromKafka(clusterPhy);
clusterPhy.getId(),
clusterPhy.getZookeeper(),
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class)
);
if (infoResult.failed()) { if (infoResult.failed()) {
return new TaskResult(TaskResult.FAIL_CODE, infoResult.getMessage()); return new TaskResult(TaskResult.FAIL_CODE, infoResult.getMessage());
} }
zookeeperService.batchReplaceDataInDB(clusterPhy.getId(), infoResult.getData()); zookeeperService.writeToDB(clusterPhy.getId(), infoResult.getData());
return TaskResult.SUCCESS; return TaskResult.SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册