From 4c53b503218b6e89e407f5049dfe16b16610b558 Mon Sep 17 00:00:00 2001 From: sunshihuan <13733918655@163.com> Date: Mon, 18 Apr 2022 10:10:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/redis/service/RedisService.java | 44 +------------- .../thinglinks-common-rocketmq/pom.xml | 6 ++ .../constant/ConsumerGroupConstant.java | 20 +++++++ .../constant/ConsumerTopicConstant.java | 24 ++++++++ .../common/rocketmq/domain/MQMessage.java | 29 +++++++++ .../link/common/init/InitDataBase.java | 44 ++++++++++++++ .../consumer/DeviceActionMessageConsumer.java | 3 +- .../controller/product/ProductController.java | 15 +++-- .../link/mapper/product/ProductMapper.java | 3 + .../link/service/product/ProductService.java | 12 +++- .../product/impl/ProductServiceImpl.java | 37 ++++++++++-- .../mapper/link/product/ProductMapper.xml | 15 +++++ ...roductCreateSuperTableMessageConsumer.java | 18 ++++-- .../common/{ => init}/InitDataBase.java | 6 +- .../tdengine/mapper/TdEngineMapper.java | 8 +++ ...roductSuperTableCreateOrUpdateService.java | 2 +- .../tdengine/service/TdEngineService.java | 2 +- ...ctSuperTableCreateOrUpdateServiceImpl.java | 14 ++--- .../service/impl/TdEngineServiceImpl.java | 59 ++++++++++++++++--- .../main/resources/mapper/TdEngineMapper.xml | 4 ++ .../monitor/config/mq/ConsumerCollection.java | 3 +- 21 files changed, 285 insertions(+), 83 deletions(-) create mode 100644 thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/constant/ConsumerGroupConstant.java create mode 100644 thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/constant/ConsumerTopicConstant.java create mode 100644 thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/domain/MQMessage.java create mode 100644 thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/common/init/InitDataBase.java rename thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/{ => common}/consumer/ProductCreateSuperTableMessageConsumer.java (66%) rename thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/{ => init}/InitDataBase.java (84%) diff --git a/thinglinks-common/thinglinks-common-redis/src/main/java/com/mqttsnet/thinglinks/common/redis/service/RedisService.java b/thinglinks-common/thinglinks-common-redis/src/main/java/com/mqttsnet/thinglinks/common/redis/service/RedisService.java index d65355f..5134ce2 100644 --- a/thinglinks-common/thinglinks-common-redis/src/main/java/com/mqttsnet/thinglinks/common/redis/service/RedisService.java +++ b/thinglinks-common/thinglinks-common-redis/src/main/java/com/mqttsnet/thinglinks/common/redis/service/RedisService.java @@ -1,17 +1,15 @@ package com.mqttsnet.thinglinks.common.redis.service; -import java.util.*; -import java.util.concurrent.TimeUnit; - import lombok.extern.slf4j.Slf4j; -import org.apache.commons.compress.utils.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.core.*; import org.springframework.data.redis.core.script.DefaultRedisScript; -import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.stereotype.Component; +import java.util.*; +import java.util.concurrent.TimeUnit; + /** * spring redis 工具类 * @@ -1621,40 +1619,4 @@ public class RedisService public Cursor> zScan(String key, ScanOptions options) { return redisTemplate.opsForZSet().scan(key, options); } - - /** - * 模糊匹配key - * - * @param match - * @param count - * @return - */ - public Cursor scan(String match, int count) { - ScanOptions scanOptions = ScanOptions.scanOptions().match(match).count(count).build(); - RedisSerializer redisSerializer = (RedisSerializer) redisTemplate.getKeySerializer(); - Cursor cursor = (Cursor) redisTemplate.executeWithStickyConnection((RedisCallback) redisConnection -> - new ConvertingCursor<>(redisConnection.scan(scanOptions), redisSerializer::deserialize)); - return cursor; - } - - /** - * scan模糊匹配删除 - * - * @param match - * @param count - */ - public void scanDelete(String match, int count) { - try { - List keys = Lists.newArrayList(); - Cursor cursor = scan(match, count); - while (cursor.hasNext()) { - //找到一次就添加一次 - keys.add(cursor.next()); - } - cursor.close(); - final long deleteObjectCount = deleteObject(keys); - } catch (Exception e) { - log.error("scanDelete error {}", e.getMessage()); - } - } } diff --git a/thinglinks-common/thinglinks-common-rocketmq/pom.xml b/thinglinks-common/thinglinks-common-rocketmq/pom.xml index 9d50d63..ac17f4a 100644 --- a/thinglinks-common/thinglinks-common-rocketmq/pom.xml +++ b/thinglinks-common/thinglinks-common-rocketmq/pom.xml @@ -24,6 +24,12 @@ 2.2.1 + + + com.mqttsnet + thinglinks-common-core + ${thinglinks.version} + \ No newline at end of file diff --git a/thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/constant/ConsumerGroupConstant.java b/thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/constant/ConsumerGroupConstant.java new file mode 100644 index 0000000..1078cbd --- /dev/null +++ b/thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/constant/ConsumerGroupConstant.java @@ -0,0 +1,20 @@ +package com.mqttsnet.thinglinks.common.rocketmq.constant; + +/** + * @Description: 消费者组常量 + * @Author: ShiHuan SUN + * @E-mail: 13733918655@163.com + * @Website: http://thinglinks.mqttsnet.com + * @CreateDate: 2022/4/15$ 15:53$ + * @UpdateUser: ShiHuan SUN + * @UpdateDate: 2022/4/15$ 15:53$ + * @UpdateRemark: 修改内容 + * @Version: V1.0 + */ +public class ConsumerGroupConstant { + /** + * default-consumer-group + */ + public static final String THINGLINKS_GROUP = "thinglinks"; + +} diff --git a/thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/constant/ConsumerTopicConstant.java b/thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/constant/ConsumerTopicConstant.java new file mode 100644 index 0000000..89da6a9 --- /dev/null +++ b/thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/constant/ConsumerTopicConstant.java @@ -0,0 +1,24 @@ +package com.mqttsnet.thinglinks.common.rocketmq.constant; + +import lombok.Data; + +/** + * @Description: 消费者主题常量 + * @Author: ShiHuan SUN + * @E-mail: 13733918655@163.com + * @Website: http://thinglinks.mqttsnet.com + * @CreateDate: 2022/4/15$ 15:53$ + * @UpdateUser: ShiHuan SUN + * @UpdateDate: 2022/4/15$ 15:53$ + * @UpdateRemark: 修改内容 + * @Version: V1.0 + */ +@Data +public class ConsumerTopicConstant { + + /** + * TDengine超级表创键修改动作监听主题 + */ + public static final String PRODUCTSUPERTABLE_CREATEORUPDATE = "productSuperTable-createOrUpdate"; + +} diff --git a/thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/domain/MQMessage.java b/thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/domain/MQMessage.java new file mode 100644 index 0000000..1765aaa --- /dev/null +++ b/thinglinks-common/thinglinks-common-rocketmq/src/main/java/com/mqttsnet/thinglinks/common/rocketmq/domain/MQMessage.java @@ -0,0 +1,29 @@ +package com.mqttsnet.thinglinks.common.rocketmq.domain; + +import lombok.Data; + +/** + * @Description: MQ消息 + * @Author: ShiHuan SUN + * @E-mail: 13733918655@163.com + * @Website: http://thinglinks.mqttsnet.com + * @CreateDate: 2022/4/15$ 16:15$ + * @UpdateUser: ShiHuan SUN + * @UpdateDate: 2022/4/15$ 16:15$ + * @UpdateRemark: 修改内容 + * @Version: V1.0 + */ +@Data +public class MQMessage { + private static final long serialVersionUID = 1L; + + /** + * 主题 + */ + private String topic; + + /** + * 消息 + */ + private String message; +} diff --git a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/common/init/InitDataBase.java b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/common/init/InitDataBase.java new file mode 100644 index 0000000..bafa350 --- /dev/null +++ b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/common/init/InitDataBase.java @@ -0,0 +1,44 @@ +package com.mqttsnet.thinglinks.link.common.init; + +import com.mqttsnet.thinglinks.link.service.product.ProductService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.StopWatch; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * @Description: 初始化基础数据 + * @Author: ShiHuan SUN + * @E-mail: 13733918655@163.com + * @Website: http://thinglinks.mqttsnet.com + * @CreateDate: 2022/3/28$ 16:12$ + * @UpdateUser: ShiHuan SUN + * @UpdateDate: 2022/3/28$ 16:12$ + * @UpdateRemark: 修改内容 + * @Version: V1.0 + */ +@Component +@Slf4j +@RefreshScope +public class InitDataBase { + private static InitDataBase InitDataBase; + + @Autowired + private ProductService productService; + + @PostConstruct + public void init() throws Exception { + InitDataBase = this; + InitDataBase.productService=this.productService; + StopWatch watch = new StopWatch(); + watch.start(); + //初始化产品模型数据 + this.productService.createSuperTableDataModel(null); + watch.stop(); + log.info("初始化基础数据成功 ! Time Elapsed (millisecond): {}",watch.getTime()); + } + +} diff --git a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/common/rockermq/consumer/DeviceActionMessageConsumer.java b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/common/rockermq/consumer/DeviceActionMessageConsumer.java index 10ab294..261bc99 100644 --- a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/common/rockermq/consumer/DeviceActionMessageConsumer.java +++ b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/common/rockermq/consumer/DeviceActionMessageConsumer.java @@ -1,6 +1,7 @@ package com.mqttsnet.thinglinks.link.common.rockermq.consumer; import com.alibaba.fastjson.JSONObject; +import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerGroupConstant; import lombok.extern.slf4j.Slf4j; import com.mqttsnet.thinglinks.link.service.device.DeviceActionService; import com.mqttsnet.thinglinks.link.service.device.DeviceDatasService; @@ -22,7 +23,7 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component -@RocketMQMessageListener(consumerGroup = "thinglinks-link", topic = "thinglinks-link") +@RocketMQMessageListener(consumerGroup = ConsumerGroupConstant.THINGLINKS_GROUP, topic = "thinglinks-link") public class DeviceActionMessageConsumer implements RocketMQListener { @Autowired private DeviceActionService deviceActionService; diff --git a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/controller/product/ProductController.java b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/controller/product/ProductController.java index 87c0b20..da48e96 100644 --- a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/controller/product/ProductController.java +++ b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/controller/product/ProductController.java @@ -37,7 +37,7 @@ public class ProductController extends BaseController { */ @Resource private ProductService productService; - @Autowired + @Resource private RemoteFileService remoteFileService; /** @@ -182,18 +182,21 @@ public class ProductController extends BaseController { } /** - * 获取超级表模型 + * 初始化生成超级表模型 + * @param productId productId==null 初始化所有产品:productId!=null 初始化指定产品 * @return + * @throws Exception */ - @GetMapping(value = "/findCreateSuperTableDataModel") - public AjaxResult findCreateSuperTableDataModel() - { + @GetMapping(value = "/findCreateSuperTableDataModel/{productId}") + public AjaxResult findCreateSuperTableDataModel(@PathVariable("productId") Long productId) throws Exception { try { - final List superTableDataModel = productService.createSuperTableDataModel(); + final List superTableDataModel = productService.createSuperTableDataModel(productId); return AjaxResult.success(superTableDataModel); }catch (Exception e){ log.error(e.getMessage()); } return AjaxResult.error("产品数据异常,请联系管理员"); } + + } diff --git a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/mapper/product/ProductMapper.java b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/mapper/product/ProductMapper.java index aba3784..0f04846 100644 --- a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/mapper/product/ProductMapper.java +++ b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/mapper/product/ProductMapper.java @@ -138,6 +138,9 @@ public interface ProductMapper { Product findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(@Param("manufacturerId")String manufacturerId,@Param("model")String model,@Param("protocolType")String protocolType,@Param("status")String status); + Product findOneByIdAndStatus(@Param("id")Long id,@Param("status")String status); + + } \ No newline at end of file diff --git a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/service/product/ProductService.java b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/service/product/ProductService.java index c0127c4..bb052ef 100644 --- a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/service/product/ProductService.java +++ b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/service/product/ProductService.java @@ -136,15 +136,21 @@ public interface ProductService{ List findAllByStatus(String status); /** - * 生成超级表模型 - * @return List + * 初始化生成超级表模型 + * @param productId productId==null 初始化所有产品:productId!=null 初始化指定产品 + * @return * @throws Exception */ - List createSuperTableDataModel()throws Exception; + List createSuperTableDataModel(Long productId)throws Exception; Product findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(String manufacturerId,String model,String protocolType,String status); + + Product findOneByIdAndStatus(Long id,String status); + + + } diff --git a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/service/product/impl/ProductServiceImpl.java b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/service/product/impl/ProductServiceImpl.java index d8b967e..2ab2796 100644 --- a/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/service/product/impl/ProductServiceImpl.java +++ b/thinglinks-modules/thinglinks-modules-link/src/main/java/com/mqttsnet/thinglinks/link/service/product/impl/ProductServiceImpl.java @@ -17,6 +17,8 @@ import com.mqttsnet.thinglinks.common.core.utils.SpringUtils; import com.mqttsnet.thinglinks.common.core.utils.StringUtils; import com.mqttsnet.thinglinks.common.core.web.domain.AjaxResult; import com.mqttsnet.thinglinks.common.redis.service.RedisService; +import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerTopicConstant; +import com.mqttsnet.thinglinks.common.rocketmq.domain.MQMessage; import com.mqttsnet.thinglinks.common.security.service.TokenService; import com.mqttsnet.thinglinks.link.api.domain.product.entity.Product; import com.mqttsnet.thinglinks.link.api.domain.product.entity.ProductProperties; @@ -38,6 +40,7 @@ import com.mqttsnet.thinglinks.tdengine.api.domain.Fields; import com.mqttsnet.thinglinks.tdengine.api.domain.SuperTableDto; import lombok.extern.slf4j.Slf4j; import org.apache.poi.ss.formula.functions.T; +import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -93,6 +96,8 @@ public class ProductServiceImpl implements ProductService{ private RemoteTdEngineService remoteTdEngineService; @Autowired private RedisService redisService; + @Autowired + private RocketMQTemplate rocketMQTemplate; /** * 数据库名称 @@ -549,15 +554,23 @@ public class ProductServiceImpl implements ProductService{ /** - * 生成超级表模型 - * @return List + * 初始化生成超级表模型 + * @param productId productId==null 初始化所有产品:productId!=null 初始化指定产品 + * @return * @throws Exception */ @Async @Override - public List createSuperTableDataModel()throws Exception{ + public List createSuperTableDataModel(Long productId)throws Exception{ List superTableDtoList = new ArrayList<>(); - List allByStatus = this.findAllByStatus("0"); + List allByStatus = null; + if (productId == null) { + allByStatus = this.findAllByStatus("0"); + }else { + allByStatus = new ArrayList<>(); + Product product = this.findOneByIdAndStatus(productId,"0"); + allByStatus.add(product); + } SuperTableDto superTableDto; loop: for (Product product : allByStatus) { @@ -622,6 +635,14 @@ public class ProductServiceImpl implements ProductService{ redisService.setCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS + superTableName, JSON.toJSONString(superTableDto)); log.info("缓存超级表数据模型:{}",JSON.toJSONString(superTableDto)); superTableDtoList.add(superTableDto); + //推送RocketMq消息初始化超级表 + MQMessage mqMessage = new MQMessage(); + mqMessage.setTopic(ConsumerTopicConstant.PRODUCTSUPERTABLE_CREATEORUPDATE); + final JSONObject jsonObject = new JSONObject(); + jsonObject.put("type","create"); + jsonObject.put("msg",JSON.toJSONString(superTableDto)); + mqMessage.setMessage(jsonObject.toJSONString()); + rocketMQTemplate.convertAndSend(mqMessage.getTopic(), mqMessage.getMessage()); } } return superTableDtoList; @@ -632,6 +653,14 @@ public class ProductServiceImpl implements ProductService{ return productMapper.findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(manufacturerId,model,protocolType,status); } + @Override + public Product findOneByIdAndStatus(Long id,String status){ + return productMapper.findOneByIdAndStatus(id,status); + } + + + + diff --git a/thinglinks-modules/thinglinks-modules-link/src/main/resources/mapper/link/product/ProductMapper.xml b/thinglinks-modules/thinglinks-modules-link/src/main/resources/mapper/link/product/ProductMapper.xml index 0eee158..326247f 100644 --- a/thinglinks-modules/thinglinks-modules-link/src/main/resources/mapper/link/product/ProductMapper.xml +++ b/thinglinks-modules/thinglinks-modules-link/src/main/resources/mapper/link/product/ProductMapper.xml @@ -910,4 +910,19 @@ + + + \ No newline at end of file diff --git a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/consumer/ProductCreateSuperTableMessageConsumer.java b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/consumer/ProductCreateSuperTableMessageConsumer.java similarity index 66% rename from thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/consumer/ProductCreateSuperTableMessageConsumer.java rename to thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/consumer/ProductCreateSuperTableMessageConsumer.java index 4137e3e..5de9c46 100644 --- a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/consumer/ProductCreateSuperTableMessageConsumer.java +++ b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/consumer/ProductCreateSuperTableMessageConsumer.java @@ -1,7 +1,10 @@ -package com.mqttsnet.thinglinks.tdengine.consumer; +package com.mqttsnet.thinglinks.tdengine.common.consumer; import com.alibaba.fastjson.JSONObject; +import com.mqttsnet.thinglinks.common.core.utils.StringUtils; import com.mqttsnet.thinglinks.tdengine.service.ProductSuperTableCreateOrUpdateService; +import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerGroupConstant; +import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerTopicConstant; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; @@ -21,7 +24,7 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component -@RocketMQMessageListener(consumerGroup = "thinglinks-tdengine", topic = "productSuperTable-createOrUpdate") +@RocketMQMessageListener(consumerGroup = ConsumerGroupConstant.THINGLINKS_GROUP, topic = ConsumerTopicConstant.PRODUCTSUPERTABLE_CREATEORUPDATE) public class ProductCreateSuperTableMessageConsumer implements RocketMQListener { @Autowired @@ -33,11 +36,18 @@ public class ProductCreateSuperTableMessageConsumer implements RocketMQListener */ @Override public void onMessage(Object message) { - assert message!=null; + if (StringUtils.isNull(message)) { + log.error("消息为空,不处理"); + return; + } JSONObject stableMessage = JSONObject.parseObject(String.valueOf(message)); log.info("TDengine消费{}超级表消息:{}"+stableMessage.get("type")+stableMessage.get("msg")); if("create".equals(stableMessage.get("type"))){ - productSuperTableCreateOrUpdateService.createProductSuperTable(String.valueOf(stableMessage.get("msg"))); + try { + productSuperTableCreateOrUpdateService.createProductSuperTable(String.valueOf(stableMessage.get("msg"))); + } catch (Exception e) { + log.error(e.getMessage()); + } }else if("update".equals(stableMessage.get("type"))){ productSuperTableCreateOrUpdateService.updateProductSuperTable(String.valueOf(stableMessage.get("msg"))); } diff --git a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/InitDataBase.java b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/init/InitDataBase.java similarity index 84% rename from thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/InitDataBase.java rename to thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/init/InitDataBase.java index 1119758..d61abe8 100644 --- a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/InitDataBase.java +++ b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/common/init/InitDataBase.java @@ -1,4 +1,4 @@ -package com.mqttsnet.thinglinks.tdengine.common; +package com.mqttsnet.thinglinks.tdengine.common.init; import com.mqttsnet.thinglinks.tdengine.service.TdEngineService; import lombok.extern.slf4j.Slf4j; @@ -44,10 +44,8 @@ public class InitDataBase { watch.start(); //创建数据库 this.tdEngineService.createDateBase(dataBaseName); - //初始化超级表结构 - this.tdEngineService.initSTableFrame(); watch.stop(); - log.info("初始化数据库及超级表:{} 成功 ! Time Elapsed (millisecond): {}",dataBaseName,watch.getTime()); + log.info("初始化数据库:{} 成功 ! Time Elapsed (millisecond): {}",dataBaseName,watch.getTime()); } } diff --git a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/mapper/TdEngineMapper.java b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/mapper/TdEngineMapper.java index cd87364..2e2b6cc 100644 --- a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/mapper/TdEngineMapper.java +++ b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/mapper/TdEngineMapper.java @@ -45,4 +45,12 @@ public interface TdEngineMapper { @Param("fieldsVo") FieldsVo fieldsVo); Map getCountByTimestamp(SelectDto selectDto); + + /** + * 检查表是否存在 + * @param dataBaseName + * @param tableName 可以为超级表名或普通表名 + * @return + */ + Integer checkTableExists(@Param("dataBaseName") String dataBaseName, @Param("tableName")String tableName); } diff --git a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/ProductSuperTableCreateOrUpdateService.java b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/ProductSuperTableCreateOrUpdateService.java index 948e8e7..3c1fdce 100644 --- a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/ProductSuperTableCreateOrUpdateService.java +++ b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/ProductSuperTableCreateOrUpdateService.java @@ -24,7 +24,7 @@ public interface ProductSuperTableCreateOrUpdateService { * @return */ - void createProductSuperTable(String msg); + void createProductSuperTable(String msg) throws Exception; /** * @Author: ShiHuan Sun diff --git a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/TdEngineService.java b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/TdEngineService.java index 0488504..9fe014c 100644 --- a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/TdEngineService.java +++ b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/TdEngineService.java @@ -31,5 +31,5 @@ public interface TdEngineService { Long getCountByTimesTamp(SelectDto selectDto); - void initSTableFrame() throws Exception; + void initSTableFrame(String msg) throws Exception; } diff --git a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/impl/ProductSuperTableCreateOrUpdateServiceImpl.java b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/impl/ProductSuperTableCreateOrUpdateServiceImpl.java index a74e38f..0f50b29 100644 --- a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/impl/ProductSuperTableCreateOrUpdateServiceImpl.java +++ b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/impl/ProductSuperTableCreateOrUpdateServiceImpl.java @@ -3,6 +3,7 @@ package com.mqttsnet.thinglinks.tdengine.service.impl; import com.mqttsnet.thinglinks.tdengine.api.domain.ProductSuperTableModel; import com.mqttsnet.thinglinks.tdengine.mapper.ProductSuperTableCreateOrUpdateMapper; import com.mqttsnet.thinglinks.tdengine.service.ProductSuperTableCreateOrUpdateService; +import com.mqttsnet.thinglinks.tdengine.service.TdEngineService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -26,19 +27,14 @@ public class ProductSuperTableCreateOrUpdateServiceImpl implements ProductSuperT @Autowired private ProductSuperTableCreateOrUpdateMapper productSuperTableCreateOrUpdateMapper; + @Autowired + private TdEngineService tdEngineService; @Override - public void createProductSuperTable(String msg) { + public void createProductSuperTable(String msg) throws Exception { //TODO 创建超级表逻辑处理 - productSuperTableCreateOrUpdateMapper.createDB(); - productSuperTableCreateOrUpdateMapper.createSuperTable(); - ProductSuperTableModel productSuperTableModel = new ProductSuperTableModel(); - //ts时间处理 - long ts = System.currentTimeMillis(); - long thirtySec = 1000 * 30; - productSuperTableModel.setTs(new Timestamp(ts + (thirtySec))); - + tdEngineService.initSTableFrame(msg); } diff --git a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/impl/TdEngineServiceImpl.java b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/impl/TdEngineServiceImpl.java index de22374..b34c9ce 100644 --- a/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/impl/TdEngineServiceImpl.java +++ b/thinglinks-modules/thinglinks-modules-tdengine/src/main/java/com/mqttsnet/thinglinks/tdengine/service/impl/TdEngineServiceImpl.java @@ -1,15 +1,20 @@ package com.mqttsnet.thinglinks.tdengine.service.impl; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.mqttsnet.thinglinks.common.core.constant.Constants; +import com.mqttsnet.thinglinks.common.core.domain.R; +import com.mqttsnet.thinglinks.common.core.enums.DataTypeEnum; import com.mqttsnet.thinglinks.common.core.utils.StringUtils; import com.mqttsnet.thinglinks.common.redis.service.RedisService; -import com.mqttsnet.thinglinks.tdengine.api.domain.SelectDto; -import com.mqttsnet.thinglinks.tdengine.api.domain.TableDto; -import com.mqttsnet.thinglinks.tdengine.api.domain.FieldsVo; +import com.mqttsnet.thinglinks.tdengine.api.domain.*; import com.mqttsnet.thinglinks.tdengine.mapper.TdEngineMapper; import com.mqttsnet.thinglinks.tdengine.service.TdEngineService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.ZSetOperations; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Propagation; @@ -84,13 +89,51 @@ public class TdEngineServiceImpl implements TdEngineService { return count; } + /** + * 检查数据库表是否存在 + * @param dataBaseName + * @param tableName tableName 可以为超级表名或普通表名 + * @return + */ + public boolean checkTableExists(String dataBaseName,String tableName) { + try { + Integer count = tdEngineMapper.checkTableExists(dataBaseName, tableName); + return count == 1; + } catch (Exception e) { + log.error("检测{}表失败", e.getMessage()); + return true; + } + } + @Override - public void initSTableFrame() throws Exception { - final Object cacheObject = redisService.getCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS); - if (StringUtils.isNull(cacheObject)) { - log.info("The production model cache is empty"); + public void initSTableFrame(String msg) throws Exception { + final SuperTableDto superTableDto = JSONObject.toJavaObject(JSONObject.parseObject(msg), SuperTableDto.class); + //从入参对象获取列字段(超级表结构)对象集合 + List schemaFields = superTableDto.getSchemaFields(); + //从入参对象获取标签字段对象集合 + List tagsFields = superTableDto.getTagsFields(); + //从入参获取数据库名称 + String dataBaseName = superTableDto.getDataBaseName(); + //从入参获取超级表名称 + String superTableName = superTableDto.getSuperTableName(); + final boolean tableExists = this.checkTableExists(dataBaseName, superTableName); + if(tableExists){ + log.info("超级表{}已存在",superTableName); + return; + } + //获取列字段对象集合的第一个对象的字段数据类型 + DataTypeEnum dataType = schemaFields.get(0).getDataType(); + //如果该数据类型不是时间戳,打印和返回报错信息 + if (dataType == null || !"timestamp".equals(dataType.getDataType())) { + log.error("invalid operation: first column must be timestamp"); + return; } - List optionalList = StringUtils.cast(cacheObject); + //将列字段对象集合和标签字段对象集合转码为字段Vo类对象集合 + List schemaFieldsVoList = FieldsVo.fieldsTranscoding(schemaFields); + List tagsFieldsVoList = FieldsVo.fieldsTranscoding(tagsFields); + //创建超级表 + this.createSuperTable(schemaFieldsVoList, tagsFieldsVoList, dataBaseName, superTableName); + log.info("create {} super table success", superTableName); } diff --git a/thinglinks-modules/thinglinks-modules-tdengine/src/main/resources/mapper/TdEngineMapper.xml b/thinglinks-modules/thinglinks-modules-tdengine/src/main/resources/mapper/TdEngineMapper.xml index 70e86ad..3471fd6 100644 --- a/thinglinks-modules/thinglinks-modules-tdengine/src/main/resources/mapper/TdEngineMapper.xml +++ b/thinglinks-modules/thinglinks-modules-tdengine/src/main/resources/mapper/TdEngineMapper.xml @@ -270,4 +270,8 @@ SELECT count(0) AS count FROM #{dataBaseName}.#{tableName} WHERE ${fieldName} BETWEEN #{startTime} AND #{endTime} + + \ No newline at end of file diff --git a/thinglinks-visual/thinglinks-visual-monitor/src/main/java/com/mqttsnet/thinglinks/monitor/config/mq/ConsumerCollection.java b/thinglinks-visual/thinglinks-visual-monitor/src/main/java/com/mqttsnet/thinglinks/monitor/config/mq/ConsumerCollection.java index c8d175e..9992ac3 100644 --- a/thinglinks-visual/thinglinks-visual-monitor/src/main/java/com/mqttsnet/thinglinks/monitor/config/mq/ConsumerCollection.java +++ b/thinglinks-visual/thinglinks-visual-monitor/src/main/java/com/mqttsnet/thinglinks/monitor/config/mq/ConsumerCollection.java @@ -4,6 +4,7 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; import com.mqttsnet.thinglinks.common.core.utils.SpringUtils; +import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerGroupConstant; import com.mqttsnet.thinglinks.monitor.api.domain.*; import com.mqttsnet.thinglinks.monitor.config.mail.MailConfig; import com.mqttsnet.thinglinks.monitor.service.LogInfoService; @@ -30,7 +31,7 @@ import java.util.concurrent.TimeUnit; */ @Slf4j @Component -@RocketMQMessageListener(consumerGroup = "thinglinks-collection", topic = "thinglinks_collection_system") +@RocketMQMessageListener(consumerGroup = ConsumerGroupConstant.THINGLINKS_GROUP, topic = "thinglinks_collection_system") public class ConsumerCollection implements RocketMQListener { public static final String content_suffix = "

Thinglinks敬上"; -- GitLab