diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java index 27274aa5d43bd394585b22345a4e79f033db3012..0eea8623e394296482f6b190a8b354ccf8238262 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java @@ -26,21 +26,27 @@ public class TopicConnectionServiceImpl implements TopicConnectionService { @Autowired private TopicConnectionDao topicConnectionDao; + private int splitInterval = 50; + @Override public void batchAdd(List doList) { if (ValidateUtils.isEmptyList(doList)) { return; } + int allSize = doList.size(); + int successSize = 0; - int count = 0; - for (TopicConnectionDO connectionDO: doList) { - try { - count += topicConnectionDao.replace(connectionDO); - } catch (Exception e) { - LOGGER.error("class=TopicConnectionServiceImpl||method=batchAdd||connectionDO={}||errMsg={}", connectionDO, e.getMessage()); - } + int part = doList.size() / splitInterval; + for (int i = 0; i < part; ++i) { + List subList = doList.subList(0, splitInterval); + successSize += topicConnectionDao.batchReplace(subList); + doList.subList(0, splitInterval).clear(); } - LOGGER.info("class=TopicConnectionServiceImpl||method=batchAdd||allSize={}||successSize={}", doList.size(), count); + if (!ValidateUtils.isEmptyList(doList)) { + successSize += topicConnectionDao.batchReplace(doList); + } + LOGGER.info("class=TopicConnectionServiceImpl||method=batchAdd||allSize={}||successSize={}", allSize, successSize); + } @Override diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/TopicConnectionDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/TopicConnectionDao.java index e07bd6c3f85f98f6e510a6cc417b2e867e45d19c..189caf9047beeb814146cab5583e06d4a5ec6ac2 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/TopicConnectionDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/TopicConnectionDao.java @@ -13,8 +13,6 @@ import java.util.List; public interface TopicConnectionDao { int batchReplace(List doList); - int replace(TopicConnectionDO topicConnectionDO); - List getByTopicName(Long clusterId, String topicName, Date startTime, Date endTime); List getByAppId(String appId, Date startTime, Date endTime); diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/TopicConnectionDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/TopicConnectionDaoImpl.java index 9162e943be2c5499fa608174a1bd31c7d6ca3954..2c7da78f835821a02aa0a44a27513b85f355038c 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/TopicConnectionDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/TopicConnectionDaoImpl.java @@ -27,29 +27,12 @@ public class TopicConnectionDaoImpl implements TopicConnectionDao { @Override public int batchReplace(List doList) { - int count = 0; - for (TopicConnectionDO elem: doList) { - try { - count += sqlSession.insert("TopicConnectionDao.replace", elem); - } catch (DeadlockLoserDataAccessException e1) { - - } catch (Exception e) { - LOGGER.error("add topic connection info, clusterId:{} topicName:{}." - , elem.getClusterId(), elem.getTopicName(), e); - } - } - return count; - } - - @Override - public int replace(TopicConnectionDO topicConnectionDO) { try { - return sqlSession.insert("TopicConnectionDao.replace", topicConnectionDO); + return sqlSession.insert("TopicConnectionDao.batchReplace", doList); } catch (DeadlockLoserDataAccessException e1) { return 0; } catch (Exception e) { - LOGGER.error("add topic connection info, clusterId:{} topicName:{}." - , topicConnectionDO.getClusterId(), topicConnectionDO.getTopicName(), e); + LOGGER.error("add topic connections info failed", e); } return 0; } diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicConnectionDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicConnectionDao.xml index bbb4f6268d29ca27e3046b48eb3c597d7ac690ba..b48e982a9a7ce5d250f07688edc41eebc13dc1ff 100644 --- a/kafka-manager-dao/src/main/resources/mapper/TopicConnectionDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/TopicConnectionDao.xml @@ -13,7 +13,7 @@ - + REPLACE INTO topic_connections ( cluster_id, topic_name, @@ -22,16 +22,19 @@ ip, client_version, create_time - ) - VALUES ( - #{clusterId}, - #{topicName}, - #{type}, - #{appId}, - #{ip}, - #{clientVersion}, - #{createTime} - ) + ) + VALUES + + ( + #{item.clusterId}, + #{item.topicName}, + #{item.type}, + #{item.appId}, + #{item.ip}, + #{item.clientVersion}, + #{item.createTime} + ) +