ProductCreateSuperTableMessageConsumer.java 2.3 KB
Newer Older
xiaonannet's avatar
优化  
xiaonannet 已提交
1
package com.mqttsnet.thinglinks.tdengine.common.consumer;
xiaonannet's avatar
xiaonannet 已提交
2 3

import com.alibaba.fastjson.JSONObject;
xiaonannet's avatar
优化  
xiaonannet 已提交
4
import com.mqttsnet.thinglinks.common.core.utils.StringUtils;
xiaonannet's avatar
xiaonannet 已提交
5
import com.mqttsnet.thinglinks.tdengine.service.ProductSuperTableCreateOrUpdateService;
xiaonannet's avatar
优化  
xiaonannet 已提交
6 7
import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerGroupConstant;
import com.mqttsnet.thinglinks.common.rocketmq.constant.ConsumerTopicConstant;
xiaonannet's avatar
xiaonannet 已提交
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Description: TDengine超级表创键修改动作监听(Rocketmq模式)
 * @Author: ShiHuan Sun
 * @E-mail: 13733918655@163.com
 * @Website: http://thinglinks.mqttsnet.com
 * @CreateDate: 2021/11/22$ 16:11$
 * @UpdateUser: ShiHuan Sun
 * @UpdateDate: 2021/11/22$ 16:11$
 * @UpdateRemark: 修改内容
 * @Version: 1.0
 */
@Slf4j
@Component
xiaonannet's avatar
优化  
xiaonannet 已提交
27
@RocketMQMessageListener(consumerGroup = ConsumerGroupConstant.THINGLINKS_GROUP, topic = ConsumerTopicConstant.PRODUCTSUPERTABLE_CREATEORUPDATE)
xiaonannet's avatar
xiaonannet 已提交
28
public class ProductCreateSuperTableMessageConsumer implements RocketMQListener {
xiaonannet's avatar
xiaonannet 已提交
29 30

    @Autowired
xiaonannet's avatar
xiaonannet 已提交
31
    private ProductSuperTableCreateOrUpdateService productSuperTableCreateOrUpdateService;
xiaonannet's avatar
xiaonannet 已提交
32

xiaonannet's avatar
xiaonannet 已提交
33 34 35 36
    /**
     * 超级表创建及修改处理
     * @param message
     */
xiaonannet's avatar
xiaonannet 已提交
37 38
    @Override
    public void onMessage(Object message) {
xiaonannet's avatar
优化  
xiaonannet 已提交
39 40 41 42
        if (StringUtils.isNull(message)) {
            log.error("消息为空,不处理");
            return;
        }
xiaonannet's avatar
xiaonannet 已提交
43
        JSONObject stableMessage = JSONObject.parseObject(String.valueOf(message));
xiaonannet's avatar
xiaonannet 已提交
44
        log.info("TDengine消费{}超级表消息:{}",stableMessage.get("type"),stableMessage.get("msg"));
xiaonannet's avatar
xiaonannet 已提交
45
        if("create".equals(stableMessage.get("type"))){
xiaonannet's avatar
优化  
xiaonannet 已提交
46 47 48 49 50
            try {
                productSuperTableCreateOrUpdateService.createProductSuperTable(String.valueOf(stableMessage.get("msg")));
            } catch (Exception e) {
                log.error(e.getMessage());
            }
xiaonannet's avatar
xiaonannet 已提交
51
        }else if("update".equals(stableMessage.get("type"))){
xiaonannet's avatar
xiaonannet 已提交
52
            productSuperTableCreateOrUpdateService.updateProductSuperTable(String.valueOf(stableMessage.get("msg")));
xiaonannet's avatar
xiaonannet 已提交
53 54 55 56
        }

    }
}