提交 4c53b503 编写于 作者: xiaonannet's avatar xiaonannet

优化

上级 18b1d6c3
package com.mqttsnet.thinglinks.common.redis.service;
import java.util.*;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* spring redis 工具类
*
......@@ -1621,40 +1619,4 @@ public class RedisService
public Cursor<ZSetOperations.TypedTuple<String>> zScan(String key, ScanOptions options) {
return redisTemplate.opsForZSet().scan(key, options);
}
/**
* 模糊匹配key
*
* @param match
* @param count
* @return
*/
public Cursor<String> scan(String match, int count) {
ScanOptions scanOptions = ScanOptions.scanOptions().match(match).count(count).build();
RedisSerializer<String> redisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer();
Cursor cursor = (Cursor) redisTemplate.executeWithStickyConnection((RedisCallback) redisConnection ->
new ConvertingCursor<>(redisConnection.scan(scanOptions), redisSerializer::deserialize));
return cursor;
}
/**
* scan模糊匹配删除
*
* @param match
* @param count
*/
public void scanDelete(String match, int count) {
try {
List<String> keys = Lists.newArrayList();
Cursor<String> cursor = scan(match, count);
while (cursor.hasNext()) {
//找到一次就添加一次
keys.add(cursor.next());
}
cursor.close();
final long deleteObjectCount = deleteObject(keys);
} catch (Exception e) {
log.error("scanDelete error {}", e.getMessage());
}
}
}
......@@ -24,6 +24,12 @@
<version>2.2.1</version>
</dependency>
<!-- thinglinks Common Core-->
<dependency>
<groupId>com.mqttsnet</groupId>
<artifactId>thinglinks-common-core</artifactId>
<version>${thinglinks.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.mqttsnet.thinglinks.common.rocketmq.constant;
/**
* @Description: 消费者组常量
* @Author: ShiHuan SUN
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2022/4/15$ 15:53$
* @UpdateUser: ShiHuan SUN
* @UpdateDate: 2022/4/15$ 15:53$
* @UpdateRemark: 修改内容
* @Version: V1.0
*/
public class ConsumerGroupConstant {
/**
* default-consumer-group
*/
public static final String THINGLINKS_GROUP = "thinglinks";
}
package com.mqttsnet.thinglinks.common.rocketmq.constant;
import lombok.Data;
/**
* @Description: 消费者主题常量
* @Author: ShiHuan SUN
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2022/4/15$ 15:53$
* @UpdateUser: ShiHuan SUN
* @UpdateDate: 2022/4/15$ 15:53$
* @UpdateRemark: 修改内容
* @Version: V1.0
*/
@Data
public class ConsumerTopicConstant {
/**
* TDengine超级表创键修改动作监听主题
*/
public static final String PRODUCTSUPERTABLE_CREATEORUPDATE = "productSuperTable-createOrUpdate";
}
package com.mqttsnet.thinglinks.common.rocketmq.domain;
import lombok.Data;
/**
* @Description: MQ消息
* @Author: ShiHuan SUN
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2022/4/15$ 16:15$
* @UpdateUser: ShiHuan SUN
* @UpdateDate: 2022/4/15$ 16:15$
* @UpdateRemark: 修改内容
* @Version: V1.0
*/
@Data
public class MQMessage {
private static final long serialVersionUID = 1L;
/**
* 主题
*/
private String topic;
/**
* 消息
*/
private String message;
}
package com.mqttsnet.thinglinks.link.common.init;
import com.mqttsnet.thinglinks.link.service.product.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @Description: 初始化基础数据
* @Author: ShiHuan SUN
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2022/3/28$ 16:12$
* @UpdateUser: ShiHuan SUN
* @UpdateDate: 2022/3/28$ 16:12$
* @UpdateRemark: 修改内容
* @Version: V1.0
*/
@Component
@Slf4j
@RefreshScope
public class InitDataBase {
private static InitDataBase InitDataBase;
@Autowired
private ProductService productService;
@PostConstruct
public void init() throws Exception {
InitDataBase = this;
InitDataBase.productService=this.productService;
StopWatch watch = new StopWatch();
watch.start();
//初始化产品模型数据
this.productService.createSuperTableDataModel(null);
watch.stop();
log.info("初始化基础数据成功 ! Time Elapsed (millisecond): {}",watch.getTime());
}
}
package com.mqttsnet.thinglinks.link.common.rockermq.consumer;
import com.alibaba.fastjson.JSONObject;
import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerGroupConstant;
import lombok.extern.slf4j.Slf4j;
import com.mqttsnet.thinglinks.link.service.device.DeviceActionService;
import com.mqttsnet.thinglinks.link.service.device.DeviceDatasService;
......@@ -22,7 +23,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "thinglinks-link", topic = "thinglinks-link")
@RocketMQMessageListener(consumerGroup = ConsumerGroupConstant.THINGLINKS_GROUP, topic = "thinglinks-link")
public class DeviceActionMessageConsumer implements RocketMQListener {
@Autowired
private DeviceActionService deviceActionService;
......
......@@ -37,7 +37,7 @@ public class ProductController extends BaseController {
*/
@Resource
private ProductService productService;
@Autowired
@Resource
private RemoteFileService remoteFileService;
/**
......@@ -182,18 +182,21 @@ public class ProductController extends BaseController {
}
/**
* 获取超级表模型
* 初始化生成超级表模型
* @param productId productId==null 初始化所有产品:productId!=null 初始化指定产品
* @return
* @throws Exception
*/
@GetMapping(value = "/findCreateSuperTableDataModel")
public AjaxResult findCreateSuperTableDataModel()
{
@GetMapping(value = "/findCreateSuperTableDataModel/{productId}")
public AjaxResult findCreateSuperTableDataModel(@PathVariable("productId") Long productId) throws Exception {
try {
final List<SuperTableDto> superTableDataModel = productService.createSuperTableDataModel();
final List<SuperTableDto> superTableDataModel = productService.createSuperTableDataModel(productId);
return AjaxResult.success(superTableDataModel);
}catch (Exception e){
log.error(e.getMessage());
}
return AjaxResult.error("产品数据异常,请联系管理员");
}
}
......@@ -138,6 +138,9 @@ public interface ProductMapper {
Product findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(@Param("manufacturerId")String manufacturerId,@Param("model")String model,@Param("protocolType")String protocolType,@Param("status")String status);
Product findOneByIdAndStatus(@Param("id")Long id,@Param("status")String status);
}
\ No newline at end of file
......@@ -136,15 +136,21 @@ public interface ProductService{
List<Product> findAllByStatus(String status);
/**
* 生成超级表模型
* @return List<SuperTableDto>
* 初始化生成超级表模型
* @param productId productId==null 初始化所有产品:productId!=null 初始化指定产品
* @return
* @throws Exception
*/
List<SuperTableDto> createSuperTableDataModel()throws Exception;
List<SuperTableDto> createSuperTableDataModel(Long productId)throws Exception;
Product findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(String manufacturerId,String model,String protocolType,String status);
Product findOneByIdAndStatus(Long id,String status);
}
......@@ -17,6 +17,8 @@ import com.mqttsnet.thinglinks.common.core.utils.SpringUtils;
import com.mqttsnet.thinglinks.common.core.utils.StringUtils;
import com.mqttsnet.thinglinks.common.core.web.domain.AjaxResult;
import com.mqttsnet.thinglinks.common.redis.service.RedisService;
import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerTopicConstant;
import com.mqttsnet.thinglinks.common.rocketmq.domain.MQMessage;
import com.mqttsnet.thinglinks.common.security.service.TokenService;
import com.mqttsnet.thinglinks.link.api.domain.product.entity.Product;
import com.mqttsnet.thinglinks.link.api.domain.product.entity.ProductProperties;
......@@ -38,6 +40,7 @@ import com.mqttsnet.thinglinks.tdengine.api.domain.Fields;
import com.mqttsnet.thinglinks.tdengine.api.domain.SuperTableDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -93,6 +96,8 @@ public class ProductServiceImpl implements ProductService{
private RemoteTdEngineService remoteTdEngineService;
@Autowired
private RedisService redisService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 数据库名称
......@@ -549,15 +554,23 @@ public class ProductServiceImpl implements ProductService{
/**
* 生成超级表模型
* @return List<SuperTableDto>
* 初始化生成超级表模型
* @param productId productId==null 初始化所有产品:productId!=null 初始化指定产品
* @return
* @throws Exception
*/
@Async
@Override
public List<SuperTableDto> createSuperTableDataModel()throws Exception{
public List<SuperTableDto> createSuperTableDataModel(Long productId)throws Exception{
List<SuperTableDto> superTableDtoList = new ArrayList<>();
List<Product> allByStatus = this.findAllByStatus("0");
List<Product> allByStatus = null;
if (productId == null) {
allByStatus = this.findAllByStatus("0");
}else {
allByStatus = new ArrayList<>();
Product product = this.findOneByIdAndStatus(productId,"0");
allByStatus.add(product);
}
SuperTableDto superTableDto;
loop:
for (Product product : allByStatus) {
......@@ -622,6 +635,14 @@ public class ProductServiceImpl implements ProductService{
redisService.setCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS + superTableName, JSON.toJSONString(superTableDto));
log.info("缓存超级表数据模型:{}",JSON.toJSONString(superTableDto));
superTableDtoList.add(superTableDto);
//推送RocketMq消息初始化超级表
MQMessage mqMessage = new MQMessage();
mqMessage.setTopic(ConsumerTopicConstant.PRODUCTSUPERTABLE_CREATEORUPDATE);
final JSONObject jsonObject = new JSONObject();
jsonObject.put("type","create");
jsonObject.put("msg",JSON.toJSONString(superTableDto));
mqMessage.setMessage(jsonObject.toJSONString());
rocketMQTemplate.convertAndSend(mqMessage.getTopic(), mqMessage.getMessage());
}
}
return superTableDtoList;
......@@ -632,6 +653,14 @@ public class ProductServiceImpl implements ProductService{
return productMapper.findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(manufacturerId,model,protocolType,status);
}
@Override
public Product findOneByIdAndStatus(Long id,String status){
return productMapper.findOneByIdAndStatus(id,status);
}
......
......@@ -910,4 +910,19 @@
</if>
</where>
</select>
<!--auto generated by ShiHuan Sun E-mail: 13733918655@163.com on 2022-04-15-->
<select id="findOneByIdAndStatus" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from product
<where>
<if test="id != null">
and id=#{id,jdbcType=BIGINT}
</if>
<if test="status != null">
and `status`=#{status,jdbcType=VARCHAR}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
package com.mqttsnet.thinglinks.tdengine.consumer;
package com.mqttsnet.thinglinks.tdengine.common.consumer;
import com.alibaba.fastjson.JSONObject;
import com.mqttsnet.thinglinks.common.core.utils.StringUtils;
import com.mqttsnet.thinglinks.tdengine.service.ProductSuperTableCreateOrUpdateService;
import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerGroupConstant;
import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerTopicConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
......@@ -21,7 +24,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "thinglinks-tdengine", topic = "productSuperTable-createOrUpdate")
@RocketMQMessageListener(consumerGroup = ConsumerGroupConstant.THINGLINKS_GROUP, topic = ConsumerTopicConstant.PRODUCTSUPERTABLE_CREATEORUPDATE)
public class ProductCreateSuperTableMessageConsumer implements RocketMQListener {
@Autowired
......@@ -33,11 +36,18 @@ public class ProductCreateSuperTableMessageConsumer implements RocketMQListener
*/
@Override
public void onMessage(Object message) {
assert message!=null;
if (StringUtils.isNull(message)) {
log.error("消息为空,不处理");
return;
}
JSONObject stableMessage = JSONObject.parseObject(String.valueOf(message));
log.info("TDengine消费{}超级表消息:{}"+stableMessage.get("type")+stableMessage.get("msg"));
if("create".equals(stableMessage.get("type"))){
productSuperTableCreateOrUpdateService.createProductSuperTable(String.valueOf(stableMessage.get("msg")));
try {
productSuperTableCreateOrUpdateService.createProductSuperTable(String.valueOf(stableMessage.get("msg")));
} catch (Exception e) {
log.error(e.getMessage());
}
}else if("update".equals(stableMessage.get("type"))){
productSuperTableCreateOrUpdateService.updateProductSuperTable(String.valueOf(stableMessage.get("msg")));
}
......
package com.mqttsnet.thinglinks.tdengine.common;
package com.mqttsnet.thinglinks.tdengine.common.init;
import com.mqttsnet.thinglinks.tdengine.service.TdEngineService;
import lombok.extern.slf4j.Slf4j;
......@@ -44,10 +44,8 @@ public class InitDataBase {
watch.start();
//创建数据库
this.tdEngineService.createDateBase(dataBaseName);
//初始化超级表结构
this.tdEngineService.initSTableFrame();
watch.stop();
log.info("初始化数据库及超级表:{} 成功 ! Time Elapsed (millisecond): {}",dataBaseName,watch.getTime());
log.info("初始化数据库:{} 成功 ! Time Elapsed (millisecond): {}",dataBaseName,watch.getTime());
}
}
......@@ -45,4 +45,12 @@ public interface TdEngineMapper {
@Param("fieldsVo") FieldsVo fieldsVo);
Map<String, Long> getCountByTimestamp(SelectDto selectDto);
/**
* 检查表是否存在
* @param dataBaseName
* @param tableName 可以为超级表名或普通表名
* @return
*/
Integer checkTableExists(@Param("dataBaseName") String dataBaseName, @Param("tableName")String tableName);
}
......@@ -24,7 +24,7 @@ public interface ProductSuperTableCreateOrUpdateService {
* @return
*/
void createProductSuperTable(String msg);
void createProductSuperTable(String msg) throws Exception;
/**
* @Author: ShiHuan Sun
......
......@@ -31,5 +31,5 @@ public interface TdEngineService {
Long getCountByTimesTamp(SelectDto selectDto);
void initSTableFrame() throws Exception;
void initSTableFrame(String msg) throws Exception;
}
......@@ -3,6 +3,7 @@ package com.mqttsnet.thinglinks.tdengine.service.impl;
import com.mqttsnet.thinglinks.tdengine.api.domain.ProductSuperTableModel;
import com.mqttsnet.thinglinks.tdengine.mapper.ProductSuperTableCreateOrUpdateMapper;
import com.mqttsnet.thinglinks.tdengine.service.ProductSuperTableCreateOrUpdateService;
import com.mqttsnet.thinglinks.tdengine.service.TdEngineService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -26,19 +27,14 @@ public class ProductSuperTableCreateOrUpdateServiceImpl implements ProductSuperT
@Autowired
private ProductSuperTableCreateOrUpdateMapper productSuperTableCreateOrUpdateMapper;
@Autowired
private TdEngineService tdEngineService;
@Override
public void createProductSuperTable(String msg) {
public void createProductSuperTable(String msg) throws Exception {
//TODO 创建超级表逻辑处理
productSuperTableCreateOrUpdateMapper.createDB();
productSuperTableCreateOrUpdateMapper.createSuperTable();
ProductSuperTableModel productSuperTableModel = new ProductSuperTableModel();
//ts时间处理
long ts = System.currentTimeMillis();
long thirtySec = 1000 * 30;
productSuperTableModel.setTs(new Timestamp(ts + (thirtySec)));
tdEngineService.initSTableFrame(msg);
}
......
package com.mqttsnet.thinglinks.tdengine.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mqttsnet.thinglinks.common.core.constant.Constants;
import com.mqttsnet.thinglinks.common.core.domain.R;
import com.mqttsnet.thinglinks.common.core.enums.DataTypeEnum;
import com.mqttsnet.thinglinks.common.core.utils.StringUtils;
import com.mqttsnet.thinglinks.common.redis.service.RedisService;
import com.mqttsnet.thinglinks.tdengine.api.domain.SelectDto;
import com.mqttsnet.thinglinks.tdengine.api.domain.TableDto;
import com.mqttsnet.thinglinks.tdengine.api.domain.FieldsVo;
import com.mqttsnet.thinglinks.tdengine.api.domain.*;
import com.mqttsnet.thinglinks.tdengine.mapper.TdEngineMapper;
import com.mqttsnet.thinglinks.tdengine.service.TdEngineService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
......@@ -84,13 +89,51 @@ public class TdEngineServiceImpl implements TdEngineService {
return count;
}
/**
* 检查数据库表是否存在
* @param dataBaseName
* @param tableName tableName 可以为超级表名或普通表名
* @return
*/
public boolean checkTableExists(String dataBaseName,String tableName) {
try {
Integer count = tdEngineMapper.checkTableExists(dataBaseName, tableName);
return count == 1;
} catch (Exception e) {
log.error("检测{}表失败", e.getMessage());
return true;
}
}
@Override
public void initSTableFrame() throws Exception {
final Object cacheObject = redisService.getCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS);
if (StringUtils.isNull(cacheObject)) {
log.info("The production model cache is empty");
public void initSTableFrame(String msg) throws Exception {
final SuperTableDto superTableDto = JSONObject.toJavaObject(JSONObject.parseObject(msg), SuperTableDto.class);
//从入参对象获取列字段(超级表结构)对象集合
List<Fields> schemaFields = superTableDto.getSchemaFields();
//从入参对象获取标签字段对象集合
List<Fields> tagsFields = superTableDto.getTagsFields();
//从入参获取数据库名称
String dataBaseName = superTableDto.getDataBaseName();
//从入参获取超级表名称
String superTableName = superTableDto.getSuperTableName();
final boolean tableExists = this.checkTableExists(dataBaseName, superTableName);
if(tableExists){
log.info("超级表{}已存在",superTableName);
return;
}
//获取列字段对象集合的第一个对象的字段数据类型
DataTypeEnum dataType = schemaFields.get(0).getDataType();
//如果该数据类型不是时间戳,打印和返回报错信息
if (dataType == null || !"timestamp".equals(dataType.getDataType())) {
log.error("invalid operation: first column must be timestamp");
return;
}
List<Optional> optionalList = StringUtils.cast(cacheObject);
//将列字段对象集合和标签字段对象集合转码为字段Vo类对象集合
List<FieldsVo> schemaFieldsVoList = FieldsVo.fieldsTranscoding(schemaFields);
List<FieldsVo> tagsFieldsVoList = FieldsVo.fieldsTranscoding(tagsFields);
//创建超级表
this.createSuperTable(schemaFieldsVoList, tagsFieldsVoList, dataBaseName, superTableName);
log.info("create {} super table success", superTableName);
}
......
......@@ -270,4 +270,8 @@
SELECT count(0) AS count
FROM #{dataBaseName}.#{tableName} WHERE ${fieldName} BETWEEN #{startTime} AND #{endTime}
</select>
<select id="checkTableExists" resultType="java.lang.Integer">
SELECT COUNT(0) FROM #{dataBaseName}.#{tableName}
</select>
</mapper>
\ No newline at end of file
......@@ -4,6 +4,7 @@ import cn.hutool.core.bean.BeanUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.mqttsnet.thinglinks.common.core.utils.SpringUtils;
import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerGroupConstant;
import com.mqttsnet.thinglinks.monitor.api.domain.*;
import com.mqttsnet.thinglinks.monitor.config.mail.MailConfig;
import com.mqttsnet.thinglinks.monitor.service.LogInfoService;
......@@ -30,7 +31,7 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "thinglinks-collection", topic = "thinglinks_collection_system")
@RocketMQMessageListener(consumerGroup = ConsumerGroupConstant.THINGLINKS_GROUP, topic = "thinglinks_collection_system")
public class ConsumerCollection implements RocketMQListener {
public static final String content_suffix = "<p><a target='_blank' href='http://www.wgstart.com'>Thinglinks</a>敬上";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册