提交 26050f23 编写于 作者: xiaonannet's avatar xiaonannet

superTableCreateOrUpdate

上级 eb3110e4
......@@ -32,7 +32,7 @@ public class DeviceActionMessageConsumer implements RocketMQListener {
@Override
public void onMessage(Object message) {
assert message!=null;
System.out.println("Link消费设备消息"+message);
log.info("Link消费设备消息"+message);
JSONObject thinglinksMessage = JSONObject.parseObject(String.valueOf(message));
/**
* TODO 设备上下线处理
......@@ -41,7 +41,7 @@ public class DeviceActionMessageConsumer implements RocketMQListener {
* ${topic} 其他为业务数据自行处理
*/
if("$event/connect".equals(thinglinksMessage.get("topic"))){
deviceActionService.connectEvent(String.valueOf(thinglinksMessage.get("msg").toString()));
deviceActionService.connectEvent(String.valueOf(thinglinksMessage.get("msg")));
}else if("$event/close".equals(thinglinksMessage.get("topic"))){
deviceActionService.closeEvent(String.valueOf(thinglinksMessage.get("msg")));
}else {
......
package com.mqttsnet.thinglinks.tdengine.common.rockermq.consumer;
import com.alibaba.fastjson.JSONObject;
import com.mqttsnet.thinglinks.tdengine.service.SuperTableCreateOrUpdateService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Description: TDengine超级表创键监听(Rocketmq模式)
* @Description: TDengine超级表创键修改动作监听(Rocketmq模式)
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
......@@ -19,14 +21,22 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "thinglinks-tdengine", topic = "create-stable")
public class CreateStableMessageConsumer implements RocketMQListener {
@RocketMQMessageListener(consumerGroup = "thinglinks-tdengine", topic = "superTable-createOrUpdate")
public class CreateSuperTableMessageConsumer implements RocketMQListener {
@Autowired
private SuperTableCreateOrUpdateService superTableCreateOrUpdateService;
@Override
public void onMessage(Object message) {
assert message!=null;
System.out.println("TDengine消费创键超级表消息"+message);
JSONObject stableMessage = JSONObject.parseObject(String.valueOf(message));
log.info("TDengine消费{}超级表消息:{}"+stableMessage.get("type")+stableMessage.get("msg"));
if("create".equals(stableMessage.get("type"))){
superTableCreateOrUpdateService.create(String.valueOf(stableMessage.get("msg")));
}else if("update".equals(stableMessage.get("type"))){
superTableCreateOrUpdateService.update(String.valueOf(stableMessage.get("msg")));
}
}
}
package com.mqttsnet.thinglinks.tdengine.mapper;
import org.apache.ibatis.annotations.Mapper;
/**
* @Description: 超级表创建及修改持久层接口
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2021/12/26$ 22:17$
* @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/12/26$ 22:17$
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Mapper
public interface SuperTableCreateOrUpdateMapper {
void dropDB();
void createDB();
void createSuperTable();
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.mqttsnet.thinglinks.tdengine.mapper.SuperTableCreateOrUpdateMapper">
<update id="dropDB">
drop
database if exists thinglinks
</update>
<update id="createDB">
create
database if not exists thinglinks
</update>
<update id="createSuperTable" parameterType="HashMap">
create table if not exists thinglinks.${SuperTableName}(
<foreach collection="columns" item="item" open="(" separator="," close=")">
#{item,jdbcType=VARBINARY}
</foreach>
)
tags(
<foreach collection="tags" item="item" open="(" separator="," close=")">
#{item,jdbcType=VARBINARY}
</foreach>
)
</update>
</mapper>
\ No newline at end of file
......@@ -11,21 +11,21 @@
<select id="lastOne" resultType="java.util.Map">
select last_row(*), location, groupid
from test.weather
from thinglinks.weather
</select>
<update id="dropDB">
drop
database if exists test
database if exists thinglinks
</update>
<update id="createDB">
create
database if not exists test
database if not exists thinglinks
</update>
<update id="createSuperTable">
create table if not exists test.weather
create table if not exists thinglinks.weather
(
ts
timestamp,
......@@ -46,7 +46,7 @@
</update>
<update id="createTable" parameterType="com.mqttsnet.thinglinks.tdengine.domain.Weather">
create table if not exists test.t#{groupId} using test.weather tags
create table if not exists thinglinks.t#{groupId} using thinglinks.weather tags
(
#{location},
#{groupId}
......@@ -54,28 +54,28 @@
</update>
<select id="select" resultMap="BaseResultMap">
select * from test.weather order by ts desc
<if test="limit != null">
select * from thinglinks.weather order by ts desc
<if thinglinks="limit != null">
limit #{limit,jdbcType=BIGINT}
</if>
<if test="offset != null">
<if thinglinks="offset != null">
offset #{offset,jdbcType=BIGINT}
</if>
</select>
<insert id="insert" parameterType="com.mqttsnet.thinglinks.tdengine.domain.Weather">
insert into test.t#{groupId} (ts, temperature, humidity, note)
insert into thinglinks.t#{groupId} (ts, temperature, humidity, note)
values (#{ts}, ${temperature}, ${humidity}, #{note})
</insert>
<select id="getSubTables" resultType="String">
select tbname
from test.weather
from thinglinks.weather
</select>
<select id="count" resultType="int">
select count(*)
from test.weather
from thinglinks.weather
</select>
<resultMap id="avgResultSet" type="com.mqttsnet.thinglinks.tdengine.domain.Weather">
......@@ -86,7 +86,7 @@
<select id="avg" resultMap="avgResultSet">
select avg(temperature), avg(humidity)
from test.weather interval(1m)
from thinglinks.weather interval(1m)
</select>
</mapper>
\ No newline at end of file
package com.mqttsnet.thinglinks.tdengine.service;
/**
* @Description: java类作用描述
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2021/12/26$ 21:51$
* @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/12/26$ 21:51$
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public interface SuperTableCreateOrUpdateService {
/**
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Description: 创建超级表
* @CreateDate: 2021/12/26 22:05
* @Version: V1.0
* @Param:
* msg 产品模型信息
* @return
*/
void create(String msg);
/**
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Description: 修改超级表
* @CreateDate: 2021/12/26 22:05
* @Version: V1.0
* @Param:
* msg 产品模型信息
* @return
*/
void update(String msg);
}
package com.mqttsnet.thinglinks.tdengine.service;
import com.mqttsnet.thinglinks.tdengine.mapper.WeatherMapper;
import com.mqttsnet.thinglinks.tdengine.domain.Weather;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Random;
@Service
public class WeatherService {
/**
* @Description: java类作用描述
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2021/12/26$ 21:52$
* @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/12/26$ 21:52$
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public interface WeatherService {
@Autowired
private WeatherMapper weatherMapper;
private Random random = new Random(System.currentTimeMillis());
private String[] locations = {"北京", "上海", "广州", "深圳", "天津"};
int init();
public int init() {
weatherMapper.dropDB();
weatherMapper.createDB();
weatherMapper.createSuperTable();
long ts = System.currentTimeMillis();
long thirtySec = 1000 * 30;
int count = 0;
for (int i = 0; i < 20; i++) {
Weather weather = new Weather(new Timestamp(ts + (thirtySec * i)), 30 * random.nextFloat(), random.nextInt(100));
weather.setLocation(locations[random.nextInt(locations.length)]);
weather.setGroupId(i % locations.length);
weather.setNote("note-" + i);
weatherMapper.createTable(weather);
count += weatherMapper.insert(weather);
}
return count;
}
int count();
public List<Weather> query(Long limit, Long offset) {
return weatherMapper.select(limit, offset);
}
List<Weather> query(Long limit, Long offset);
public int save(float temperature, float humidity) {
long ts = System.currentTimeMillis();
long thirtySec = 1000 * 30;
Weather weather = new Weather();
weather.setTs(new Timestamp(ts + (thirtySec)));
weather.setTemperature(temperature);
weather.setHumidity(humidity);
weather.setNote("1");
return weatherMapper.insert(weather);
}
int save(float temperature, float humidity);
public int count() {
return weatherMapper.count();
}
List<String> getSubTables();
public List<String> getSubTables() {
return weatherMapper.getSubTables();
}
List<Weather> avg();
public List<Weather> avg() {
return weatherMapper.avg();
}
public Weather lastOne() {
Map<String, Object> result = weatherMapper.lastOne();
long ts = (long) result.get("ts");
float temperature = (float) result.get("temperature");
float humidity = (float) result.get("humidity");
String note = (String) result.get("note");
int groupId = (int) result.get("groupid");
String location = (String) result.get("location");
Weather weather = new Weather(new Timestamp(ts), temperature, humidity);
weather.setNote(note);
weather.setGroupId(groupId);
weather.setLocation(location);
return weather;
}
Weather lastOne();
}
package com.mqttsnet.thinglinks.tdengine.service.impl;
import com.mqttsnet.thinglinks.tdengine.mapper.SuperTableCreateOrUpdateMapper;
import com.mqttsnet.thinglinks.tdengine.service.SuperTableCreateOrUpdateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @Description: java类作用描述
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2021/12/26$ 21:51$
* @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/12/26$ 21:51$
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Service
@Slf4j
public class SuperTableCreateOrUpdateServiceImpl implements SuperTableCreateOrUpdateService {
@Autowired
private SuperTableCreateOrUpdateMapper superTableCreateOrUpdateMapper;
@Override
public void create(String msg) {
//TODO 创建超级表逻辑处理
superTableCreateOrUpdateMapper.createDB();
superTableCreateOrUpdateMapper.createSuperTable();
}
@Override
public void update(String msg) {
}
}
package com.mqttsnet.thinglinks.tdengine.service.impl;
import com.mqttsnet.thinglinks.tdengine.mapper.WeatherMapper;
import com.mqttsnet.thinglinks.tdengine.domain.Weather;
import com.mqttsnet.thinglinks.tdengine.service.WeatherService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Random;
@Service
@Slf4j
public class WeatherServiceImpl implements WeatherService {
@Autowired
private WeatherMapper weatherMapper;
private Random random = new Random(System.currentTimeMillis());
private String[] locations = {"北京", "上海", "广州", "深圳", "天津"};
@Override
public int init() {
weatherMapper.dropDB();
weatherMapper.createDB();
weatherMapper.createSuperTable();
long ts = System.currentTimeMillis();
long thirtySec = 1000 * 30;
int count = 0;
for (int i = 0; i < 20; i++) {
Weather weather = new Weather(new Timestamp(ts + (thirtySec * i)), 30 * random.nextFloat(), random.nextInt(100));
weather.setLocation(locations[random.nextInt(locations.length)]);
weather.setGroupId(i % locations.length);
weather.setNote("note-" + i);
weatherMapper.createTable(weather);
count += weatherMapper.insert(weather);
}
return count;
}
@Override
public List<Weather> query(Long limit, Long offset) {
return weatherMapper.select(limit, offset);
}
@Override
public int save(float temperature, float humidity) {
long ts = System.currentTimeMillis();
long thirtySec = 1000 * 30;
Weather weather = new Weather();
weather.setTs(new Timestamp(ts + (thirtySec)));
weather.setTemperature(temperature);
weather.setHumidity(humidity);
weather.setNote("1");
return weatherMapper.insert(weather);
}
@Override
public int count() {
return weatherMapper.count();
}
@Override
public List<String> getSubTables() {
return weatherMapper.getSubTables();
}
@Override
public List<Weather> avg() {
return weatherMapper.avg();
}
@Override
public Weather lastOne() {
Map<String, Object> result = weatherMapper.lastOne();
long ts = (long) result.get("ts");
float temperature = (float) result.get("temperature");
float humidity = (float) result.get("humidity");
String note = (String) result.get("note");
int groupId = (int) result.get("groupid");
String location = (String) result.get("location");
Weather weather = new Weather(new Timestamp(ts), temperature, humidity);
weather.setNote(note);
weather.setGroupId(groupId);
weather.setLocation(location);
return weather;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册