提交 6891c40c 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

......@@ -118,3 +118,4 @@ contrib/*
!contrib/test
sql
debug*/
.env
\ No newline at end of file
......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 8a5e336
GIT_TAG 3c7dafe
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -2,7 +2,7 @@
# taosws-rs
ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git
GIT_TAG 648cc62
GIT_TAG 29424d5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -2,6 +2,7 @@ package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
......@@ -10,7 +11,7 @@ func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
defer conn.Close()
if err != nil {
fmt.Println("failed to connect, err:", err)
log.Fatalln("failed to connect, err:", err)
} else {
fmt.Println("connected")
}
......
......@@ -3,6 +3,7 @@ package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
......@@ -11,7 +12,7 @@ func main() {
var taosDSN = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
log.Fatalln("failed to connect TDengine, err:", err)
return
}
fmt.Println("Connected")
......
......@@ -3,6 +3,7 @@ package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
......@@ -11,7 +12,7 @@ func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
log.Fatalln("failed to connect TDengine, err:", err)
return
}
fmt.Println("Connected")
......
package main
import (
"fmt"
"github.com/taosdata/driver-go/v3/wrapper"
)
func main() {
conn, err := wrapper.TaosConnect("localhost", "root", "taosdata", "", 6030)
defer wrapper.TaosClose(conn)
if err != nil {
fmt.Println("fail to connect, err:", err)
} else {
fmt.Println("connected")
}
}
package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
......@@ -20,7 +20,7 @@ func prepareDatabase(conn *af.Connector) {
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
fmt.Println("fail to connect, err:", err)
log.Fatalln("fail to connect, err:", err)
}
defer conn.Close()
prepareDatabase(conn)
......@@ -32,6 +32,6 @@ func main() {
err = conn.OpenTSDBInsertJsonPayload(payload)
if err != nil {
fmt.Println("insert error:", err)
log.Fatalln("insert error:", err)
}
}
......@@ -2,6 +2,7 @@ package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
......@@ -33,6 +34,6 @@ func main() {
err = conn.InfluxDBInsertLines(lines, "ms")
if err != nil {
fmt.Println("insert error:", err)
log.Fatalln("insert error:", err)
}
}
......@@ -3,6 +3,7 @@ package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
......@@ -10,28 +11,26 @@ import (
func createStable(taos *sql.DB) {
_, err := taos.Exec("CREATE DATABASE power")
if err != nil {
fmt.Println("failed to create database, err:", err)
log.Fatalln("failed to create database, err:", err)
}
_, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
if err != nil {
fmt.Println("failed to create stable, err:", err)
log.Fatalln("failed to create stable, err:", err)
}
}
func insertData(taos *sql.DB) {
sql := `INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
sql := `INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
result, err := taos.Exec(sql)
if err != nil {
fmt.Println("failed to insert, err:", err)
return
log.Fatalln("failed to insert, err:", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
fmt.Println("failed to get affected rows, err:", err)
return
log.Fatalln("failed to get affected rows, err:", err)
}
fmt.Println("RowsAffected", rowsAffected)
}
......@@ -40,8 +39,7 @@ func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
log.Fatalln("failed to connect TDengine, err:", err)
}
defer taos.Close()
createStable(taos)
......
......@@ -5,8 +5,8 @@ import (
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/param"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
)
func checkErr(err error, prompt string) {
......
package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
......@@ -20,7 +20,7 @@ func prepareDatabase(conn *af.Connector) {
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
fmt.Println("fail to connect, err:", err)
log.Fatalln("fail to connect, err:", err)
}
defer conn.Close()
prepareDatabase(conn)
......@@ -37,6 +37,6 @@ func main() {
err = conn.OpenTSDBInsertTelnetLines(lines)
if err != nil {
fmt.Println("insert error:", err)
log.Fatalln("insert error:", err)
}
}
......@@ -2,7 +2,7 @@ package main
import (
"database/sql"
"fmt"
"log"
"time"
_ "github.com/taosdata/driver-go/v3/taosRestful"
......@@ -12,14 +12,12 @@ func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/power"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
log.Fatalln("failed to connect TDengine, err:", err)
}
defer taos.Close()
rows, err := taos.Query("SELECT ts, current FROM meters LIMIT 2")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
log.Fatalln("failed to select from table, err:", err)
}
defer rows.Close()
......@@ -30,9 +28,9 @@ func main() {
}
err := rows.Scan(&r.ts, &r.current)
if err != nil {
fmt.Println("scan error:\n", err)
log.Fatalln("scan error:\n", err)
return
}
fmt.Println(r.ts, r.current)
log.Fatalln(r.ts, r.current)
}
}
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
const conn = taos.connect({ host: "localhost", database: "power" });
const cursor = conn.cursor();
......@@ -18,4 +18,3 @@ try {
conn.close();
}, 2000);
}
// bug here: jira 14506
const taos = require("td2.0-connector");
const { options, connect } = require("@tdengine/rest");
var conn = taos.connect({
host: "localhost",
port: 6030,
user: "root",
password: "taosdata",
});
conn.close();
async function test() {
options.path = "/rest/sql";
options.host = "localhost";
let conn = connect(options);
let cursor = conn.cursor();
try {
let res = await cursor.query("SELECT server_version()");
res.toString();
} catch (err) {
console.log(err);
}
}
test();
// run with: node connect.js
// output:
// Successfully connected to TDengine
// server_version() |
// ===================
// 3.0.0.0 |
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
const conn = taos.connect({
host: "localhost",
......
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
const conn = taos.connect({
host: "localhost",
......@@ -11,11 +11,11 @@ try {
cursor.execute(
"CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
);
var sql = `INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`;
cursor.execute(sql);
var sql = `INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`;
cursor.execute(sql,{'quiet':false});
} finally {
cursor.close();
conn.close();
......
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
const conn = taos.connect({
host: "localhost",
......@@ -24,10 +24,10 @@ function insertData() {
);
// bind table name and tags
let tagBind = new taos.TaosBind(2);
tagBind.bindBinary("California.SanFrancisco");
tagBind.bindInt(2);
cursor.stmtSetTbnameTags("d1001", tagBind.getBind());
let tagBind = new taos.TaosMultiBindArr(2);
tagBind.multiBindBinary(["California.SanFrancisco"]);
tagBind.multiBindInt([2]);
cursor.stmtSetTbnameTags("d1001", tagBind.getMultiBindArr());
// bind values
let valueBind = new taos.TaosMultiBindArr(4);
......
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
const conn = taos.connect({
host: "localhost",
......
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
const conn = taos.connect({
host: "localhost",
......
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
const conn = taos.connect({
host: "localhost",
......@@ -23,25 +23,22 @@ function insertData() {
);
// bind table name and tags
let tagBind = new taos.TaosBind(2);
tagBind.bindBinary("California.SanFrancisco");
tagBind.bindInt(2);
cursor.stmtSetTbnameTags("d1001", tagBind.getBind());
let tagBind = new taos.TaosMultiBindArr(2);
tagBind.multiBindBinary(["California.SanFrancisco"]);
tagBind.multiBindInt([2]);
cursor.stmtSetTbnameTags("d1001", tagBind.getMultiBindArr());
// bind values
let rows = [
[1648432611249, 10.3, 219, 0.31],
[1648432611749, 12.6, 218, 0.33],
];
for (let row of rows) {
let valueBind = new taos.TaosBind(4);
valueBind.bindTimestamp(row[0]);
valueBind.bindFloat(row[1]);
valueBind.bindInt(row[2]);
valueBind.bindFloat(row[3]);
cursor.stmtBindParam(valueBind.getBind());
cursor.stmtAddBatch();
}
let rows = [[1648432611249, 1648432611749], [10.3, 12.6], [219, 218], [0.31, 0.33]];
let valueBind = new taos.TaosMultiBindArr(4);
valueBind.multiBindTimestamp(rows[0]);
valueBind.multiBindFloat(rows[1]);
valueBind.multiBindInt(rows[2]);
valueBind.multiBindFloat(rows[3]);
cursor.stmtBindParamBatch(valueBind.getMultiBindArr());
cursor.stmtAddBatch();
// execute
cursor.stmtExecute();
......
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
const conn = taos.connect({ host: "localhost", database: "power" });
const cursor = conn.cursor();
......@@ -9,8 +9,6 @@ query.execute().then(function (result) {
// output:
// Successfully connected to TDengine
// Query OK, 2 row(s) in set (0.00317767s)
// ts | current |
// =======================================================
// 2018-10-03 14:38:05.000 | 10.3 |
......
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
const conn = taos.connect({ host: "localhost", database: "power" });
// 未完成
\ No newline at end of file
var cursor = conn.cursor();
function runConsumer() {
// create topic
cursor.execute("create topic topic_name_example as select * from meters");
let consumer = taos.consumer({
'group.id': 'tg2',
'td.connect.user': 'root',
'td.connect.pass': 'taosdata',
'msg.with.table.name': 'true',
'enable.auto.commit': 'true'
});
// subscribe the topic just created.
consumer.subscribe("topic_name_example");
// get subscribe topic list
let topicList = consumer.subscription();
console.log(topicList);
for (let i = 0; i < 5; i++) {
let msg = consumer.consume(100);
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
consumer.commit(msg);
console.log(`=======consumer ${i} done`)
}
consumer.unsubscribe();
consumer.close();
// drop topic
cursor.execute("drop topic topic_name_example");
}
try {
runConsumer();
} finally {
setTimeout(() => {
cursor.close();
conn.close();
}, 2000);
}
\ No newline at end of file
......@@ -4,7 +4,7 @@
"main": "index.js",
"license": "MIT",
"dependencies": {
"td2.0-connector": "^2.0.12",
"td2.0-rest-connector": "^1.0.0"
"@tdengine/client": "^3.0.0",
"@tdengine/rest": "^3.0.0"
}
}
const { options, connect } = require("td2.0-rest-connector");
const { options, connect } = require("@tdengine/rest");
async function test() {
options.path = "/rest/sqlt";
......@@ -17,4 +17,4 @@ test();
// output:
// server_version() |
// ===================
// 2.4.0.12 |
// 3.0.0.0 |
......@@ -3,6 +3,6 @@
```
:::tip
driver-go 的模块 `github.com/taosdata/driver-go/v2/wrapper` 是 C 接口的底层封装。使用这个模块也可以实现参数绑定写入。
driver-go 的模块 `github.com/taosdata/driver-go/v3/wrapper` 是 C 接口的底层封装。使用这个模块也可以实现参数绑定写入。
:::
---
sidebar_label: 消息队列
description: "数据订阅与推送服务。连续写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
title: 消息队列
sidebar_label: 数据订阅
description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
title: 数据订阅
---
基于数据天然的时间序列特性,TDengine 的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,均可视为系统中插入一条带时间戳的新记录。同时,TDengine 在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说,TDengine 中每一张表均可视为一个标准的消息队列
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本
TDengine 内嵌支持消息订阅与推送服务(下文都简称TMQ)。使用系统提供的 API,用户可使用普通查询语句订阅数据库中的一张或多张表,或整个库。客户端启动订阅后,定时或按需轮询服务器是否有新的记录到达,有新的记录到达就会将结果反馈到客户
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 可以是一张超级表,或一张子表。不仅如此,你可以通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤交给 TDengine,而不是应用完成,有效的减少传输的数据量
TMQ提供了提交机制来保证消息队列的可靠性和正确性。在调用方法上,支持自动提交和手动提交。
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程分布式的消费数据,提高数据通吐率。但不同消费者组即使消费同一个topic, 并不共享消费进度。一个消费者组可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的vnode上,也就是多个shard上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。
为了实现上述功能,TDengine 采用了灵活的 WAL (Write-Ahead-Log) 文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine 从 WAL 中获取数据,并经过过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
## 主要数据结构和API
TMQ 的 API 中,与订阅相关的主要数据结构和API如下:
......@@ -47,7 +53,9 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码可以在 [tmq.c](https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c) 看到。
一、首先完成建库、建一张超级表和多张子表,并每个子表插入若干条数据记录:
## 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
```sql
drop database if exists tmqdb;
......@@ -63,14 +71,15 @@ insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22');
insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33');
```
二、创建topic:
## 创建topic:
```sql
create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
```
注:TMQ支持多种订阅类型:
1、列订阅
TMQ支持多种订阅类型:
### 列订阅
语法:CREATE TOPIC topic_name as subquery
通过select语句订阅(包括select *,或select ts, c1等指定列描述订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)
......@@ -79,25 +88,18 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
- 被订阅或用于计算的column和tag不可被删除、修改
- 若发生schema变更,新增的column不出现在结果中
2、超级表订阅
### 超级表订阅
语法:CREATE TOPIC topic_name AS STABLE stbName
- 订阅某超级表的全部数据,schema变更不受限,schema变更后写入的数据将以最新schema返回
- 在tmq的返回消息中schema是块级别的,每块的schema可能不一样
- 列变更后写入的数据若未落盘,将以写入时的schema返回
- 列变更后写入的数据若已落盘,将以落盘时的schema返回
3、db订阅
语法:CREATE TOPIC topic_name AS DATABASE db_name
- 订阅某一db的全部数据,schema变更不受限
- 在tmq的返回消息中schema是块级别的,每块的schema可能不一样
- 列变更后写入的数据若未落盘,将以写入时的schema返回
- 列变更后写入的数据若已落盘,将以落盘时的schema返回
与select * from stbName订阅的区别是:
- 不会限制用户的schema变更
- 返回的是非结构化的数据:返回数据的schema会随之超级表的schema变化而变化
- 用户对于要处理的每一个数据块都可能有不同的schema,因此,必须重新获取schema
- 返回数据不带有tag
三、创建consumer
## 创建 consumer 以及consumer group
目前支持的config
对于consumer, 目前支持的config包括
| 参数名称 | 参数值 | 备注 |
| ---------------------------- | ------------------------------ | ------------------------------------------------------ |
......@@ -121,7 +123,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -131,7 +133,12 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
return tmq;
```
四、创建订阅主题列表
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
## 创建 topic 列表
单个consumer支持同时订阅多个topic。
```sql
tmq_list_t* topicList = tmq_list_new();
......@@ -139,9 +146,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
return topicList;
```
单个consumer支持同时订阅多个topic。
五、启动订阅并开始消费
## 启动订阅并开始消费
```sql
/* 启动订阅 */
......@@ -151,9 +156,9 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
/* 循环poll消息 */
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t consumeDelay = 5000;
int32_t timeOut = 5000;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, consumeDelay);
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
......@@ -190,7 +195,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf);
}
......@@ -199,7 +204,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
}
```
五、结束消费
## 结束消费
```sql
/* 取消订阅 */
......@@ -209,7 +214,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
tmq_consumer_close(tmq);
```
六、删除topic
## 删除topic
如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。
......@@ -218,7 +223,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
drop topic topicName;
```
七、状态查看
## 状态查看
1、topics:查询已经创建的topic
......
......@@ -15,11 +15,11 @@ import NodeOpenTSDBTelnet from "../../07-develop/03-insert-data/_js_opts_telnet.
import NodeOpenTSDBJson from "../../07-develop/03-insert-data/_js_opts_json.mdx";
import NodeQuery from "../../07-develop/04-query-data/_js.mdx";
`td2.0-connector` 和 `td2.0-rest-connector` 是 TDengine 的官方 Node.js 语言连接器。Node.js 开发人员可以通过它开发可以存取 TDengine 集群数据的应用软件
`@tdengine/client` 和 `@tdengine/rest` 是 TDengine 的官方 Node.js 语言连接器。 Node.js 开发人员可以通过它开发可以存取 TDengine 集群数据的应用软件。注意:从 TDengine 3.0 开始 Node.js 原生连接器的包名由 `td2.0-connector` 改名为 `@tdengine/client` 而 rest 连接器的包名由 `td2.0-rest-connector` 改为 `@tdengine/rest`。并且不与 TDengine 2.x 兼容
`td2.0-connector` 是**原生连接器**,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。`td2.0-rest-connector` 是 **REST 连接器**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 的运行实例。REST 连接器可以在任何平台运行,但性能略为下降,接口实现的功能特性集合和原生接口有少量不同。
`@tdengine/client` 是**原生连接器**,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。`@tdengine/rest` 是 **REST 连接器**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 的运行实例。REST 连接器可以在任何平台运行,但性能略为下降,接口实现的功能特性集合和原生接口有少量不同。
Node.js 连接器源码托管在 [GitHub](https://github.com/taosdata/taos-connector-node)。
Node.js 连接器源码托管在 [GitHub](https://github.com/taosdata/taos-connector-node/tree/3.0)。
## 支持的平台
......@@ -58,7 +58,7 @@ REST 连接器支持所有能运行 Node.js 的平台。
<TabItem value="Linux" label="Linux 系统安装依赖工具">
- `python` (建议`v2.7` , `v3.x.x` 目前还不支持)
- `td2.0-connector` 2.0.6 支持 Node.js LTS v10.9.0 或更高版本, Node.js LTS v12.8.0 或更高版本;2.0.5 及更早版本支持 Node.js LTS v10.x 版本。其他版本可能存在包兼容性的问题
- `@tdengine/client` 3.0.0 支持 Node.js LTS v10.9.0 或更高版本, Node.js LTS v12.8.0 或更高版本;其他版本可能存在包兼容性的问题
- `make`
- C 语言编译器,[GCC](https://gcc.gnu.org) v4.8.5 或更高版本
......@@ -90,14 +90,14 @@ REST 连接器支持所有能运行 Node.js 的平台。
<TabItem value="install_native" label="安装原生连接器">
```bash
npm install td2.0-connector
npm install @tdengine/client
```
</TabItem>
<TabItem value="install_rest" label="安装 REST 连接器">
```bash
npm i td2.0-rest-connector
npm install @tdengine/rest
```
</TabItem>
......@@ -109,13 +109,13 @@ npm i td2.0-rest-connector
验证方法:
- 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [nodejsChecker.js 源代码](https://github.com/taosdata/TDengine/tree/develop/examples/nodejs/nodejsChecker.js)到本地。
- 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [nodejsChecker.js 源代码](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/nodejsChecker.js)到本地。
- 在命令行中执行以下命令。
```bash
npm init -y
npm install td2.0-connector
npm install @tdengine/client
node nodejsChecker.js host=localhost
```
......@@ -128,11 +128,11 @@ node nodejsChecker.js host=localhost
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
安装并引用 `td2.0-connector` 包。
安装并引用 `@tdengine/client` 包。
```javascript
//A cursor also needs to be initialized in order to interact with TDengine from Node.js.
const taos = require("td2.0-connector");
const taos = require("@tdengine/client");
var conn = taos.connect({
host: "127.0.0.1",
user: "root",
......@@ -149,12 +149,12 @@ conn.close();
</TabItem>
<TabItem value="rest" label="REST 连接">
安装并引用 `td2.0-rest-connector` 包。
安装并引用 `@tdengine/rest` 包。
```javascript
//A cursor also needs to be initialized in order to interact with TDengine from Node.js.
import { options, connect } from "td2.0-rest-connector";
options.path = "/rest/sqlt";
import { options, connect } from "@tdengine/rest";
options.path = "/rest/sql";
// set host
options.host = "localhost";
// set other options like user/passwd
......@@ -190,26 +190,23 @@ let cursor = conn.cursor();
<NodeQuery />
## 更多示例程序
| 示例程序 | 示例程序描述 |
| ------------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------- |
| [connection](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/cursorClose.js) | 建立连接的示例。 |
| [stmtBindBatch](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/stmtBindParamBatchSample.js) | 绑定多行参数插入的示例。 |
| [stmtBind](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/stmtBindParamSample.js) | 一行一行绑定参数插入的示例。 |
| [stmtBindSingleParamBatch](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/stmtBindSingleParamBatchSample.js) | 按列绑定参数插入的示例。 |
| [stmtUseResult](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/stmtUseResultSample.js) | 绑定参数查询的示例。 |
| [json tag](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testJsonTag.js) | Json tag 的使用示例。 |
| [Nanosecond](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testNanoseconds.js) | 时间戳为纳秒精度的使用的示例。 |
| [Microsecond](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testMicroseconds.js) | 时间戳为微秒精度的使用的示例。 |
| [schemless insert](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testSchemalessInsert.js) | schemless 插入的示例。 |
| [subscribe](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testSubscribe.js) | 订阅的使用示例。 |
| [asyncQuery](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/tset.js) | 异步查询的使用示例。 |
| [REST](https://github.com/taosdata/taos-connector-node/blob/develop/typescript-rest/example/example.ts) | 使用 REST 连接的 TypeScript 使用示例。 |
| [basicUse](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/queryExample.js) | 基本的使用如如建立连接,执行 SQL 等操作。 |
| [stmtBindBatch](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/bindParamBatch.js) | 绑定多行参数插入的示例。 | |
| [stmtBindSingleParamBatch](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/bindSingleParamBatch.js) | 按列绑定参数插入的示例。 |
| [stmtQuery](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/stmtQuery.js) | 绑定参数查询的示例。 |
| [schemless insert](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/schemaless.js) | schemless 插入的示例。 |
| [TMQ](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/tmq.js) | 订阅的使用示例。 |
| [asyncQuery](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/asyncQueryExample.js) | 异步查询的使用示例。 |
| [REST](https://github.com/taosdata/taos-connector-node/blob/3.0/typescript-rest/example/example.ts) | 使用 REST 连接的 TypeScript 使用示例。 |
## 使用限制
Node.js 连接器 >= v2.0.6 目前支持 node 的版本为:支持 >=v12.8.0 <= v12.9.1 || >=v10.20.0 <= v10.9.0 ;2.0.5 及更早版本支持 v10.x 版本,其他版本可能存在包兼容性的问题。
native 连接器(`@tdengine/client`) >= v3.0.0 目前支持 node 的版本为:支持 >=v12.8.0 <= v12.9.1 || >=v10.20.0 <= v10.9.0 ;2.0.5 及更早版本支持 v10.x 版本,其他版本可能存在包兼容性的问题。
## 其他说明
......@@ -225,7 +222,7 @@ Node.js 连接器的使用参见[视频教程](https://www.taosdata.com/blog/202
2. Node.js 版本
连接器 >v2.0.6 目前兼容的 Node.js 版本为:>=v10.20.0 <= v10.9.0 || >=v12.8.0 <= v12.9.1
原生连接器 `@tdengine/client` 目前兼容的 Node.js 版本为:>=v10.20.0 <= v10.9.0 || >=v12.8.0 <= v12.9.1
3. "Unable to establish connection","Unable to resolve FQDN"
......@@ -235,18 +232,22 @@ Node.js 连接器的使用参见[视频教程](https://www.taosdata.com/blog/202
### 原生连接器
| td2.0-connector 版本 | 说明 |
| -------------------- | ---------------------------------------------------------------- |
| 2.0.12 | 修复 cursor.close() 报错的 bug。 |
| 2.0.11 | 支持绑定参数、json tag、schemaless 接口等功能。 |
| 2.0.10 | 支持连接管理,普通查询、连续查询、获取系统信息、订阅功能等功能。 |
| package name | version | TDengine version | 说明 |
|------------------|---------|---------------------|------------------------------------------------------------------|
| @tdengine/client | 3.0.0 | 3.0.0 | 支持TDengine 3.0 且不与2.x 兼容。 |
| td2.0-connector | 2.0.12 | 2.4.x;2.5.x;2.6.x | 修复 cursor.close() 报错的 bug。 |
| td2.0-connector | 2.0.11 | 2.4.x;2.5.x;2.6.x | 支持绑定参数、json tag、schemaless 接口等功能。 |
| td2.0-connector | 2.0.10 | 2.4.x;2.5.x;2.6.x | 支持连接管理,普通查询、连续查询、获取系统信息、订阅功能等功能。 |
### REST 连接器
| td2.0-rest-connector 版本 | 说明 |
| ------------------------- | ---------------------------------------------------------------- |
| 1.0.3 | 支持连接管理、普通查询、获取系统信息、错误信息、连续查询等功能。 |
| package name | version | TDengine version | 说明 |
|----------------------|---------|---------------------|---------------------------------------------------------------------------|
| @tdengine/rest | 3.0.0 | 3.0.0 | 支持 TDegnine 3.0,且不与2.x 兼容。 |
| td2.0-rest-connector | 1.0.7 | 2.4.x;2.5.x;2.6.x | 移除默认端口 6041。 |
| td2.0-rest-connector | 1.0.6 | 2.4.x;2.5.x;2.6.x | 修复create,insert,update,alter 等SQL 执行返回的 affectRows 错误的bug。 |
| td2.0-rest-connector | 1.0.5 | 2.4.x;2.5.x;2.6.x | 支持云服务 cloud Token; |
| td2.0-rest-connector | 1.0.3 | 2.4.x;2.5.x;2.6.x | 支持连接管理、普通查询、获取系统信息、错误信息、连续查询等功能。 |
## API 参考
[API 参考](https://docs.taosdata.com/api/td2.0-connector/)
[API 参考](https://docs.taosdata.com/api/td2.0-connector/)
\ No newline at end of file
......@@ -147,7 +147,7 @@ import (
"fmt"
"time"
_ "github.com/taosdata/driver-go/v2/taosSql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
type config struct {
......
---
sidebar_label: 发布历史
title: 发布历史
---
import Release from "/components/Release";
<Release versionPrefix="3.0" />
......@@ -2657,6 +2657,34 @@ typedef struct {
SEpSet epSet;
} SVgEpSet;
typedef struct {
int64_t refId;
int64_t suid;
int8_t level;
} SRSmaFetchMsg;
static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->refId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->level) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->level) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
......@@ -3055,6 +3083,7 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
typedef struct {
int32_t msgIdx;
int32_t msgType;
int32_t msgLen;
void* msg;
......@@ -3068,6 +3097,7 @@ typedef struct {
typedef struct {
int32_t reqType;
int32_t msgIdx;
int32_t msgLen;
int32_t rspCode;
void* msg;
......
......@@ -58,12 +58,17 @@ typedef struct SDbInfo {
int64_t dbId;
} SDbInfo;
typedef struct STablesReq {
char dbFName[TSDB_DB_FNAME_LEN];
SArray* pTables;
} STablesReq;
typedef struct SCatalogReq {
SArray* pDbVgroup; // element is db full name
SArray* pDbCfg; // element is db full name
SArray* pDbInfo; // element is db full name
SArray* pTableMeta; // element is SNAME
SArray* pTableHash; // element is SNAME
SArray* pTableMeta; // element is STablesReq
SArray* pTableHash; // element is STablesReq
SArray* pUdf; // element is udf name
SArray* pIndex; // element is index name
SArray* pUser; // element is SUserAuthInfo
......
......@@ -103,7 +103,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
* @return
*/
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model);
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model);
/**
*
......
......@@ -202,6 +202,7 @@ bool fmIsForbidStreamFunc(int32_t funcId);
bool fmIsIntervalInterpoFunc(int32_t funcId);
bool fmIsInterpFunc(int32_t funcId);
bool fmIsLastRowFunc(int32_t funcId);
bool fmIsNotNullOutputFunc(int32_t funcId);
bool fmIsSelectValueFunc(int32_t funcId);
bool fmIsSystemInfoFunc(int32_t funcId);
bool fmIsImplicitTsFunc(int32_t funcId);
......
......@@ -41,12 +41,13 @@ typedef struct {
typedef struct SRpcHandleInfo {
// rpc info
void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server
int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp)
int8_t persistHandle; // persist handle or not
int8_t hasEpSet;
STraceId traceId;
int8_t hasEpSet;
// app info
void *ahandle; // app handle set by client
......@@ -69,8 +70,9 @@ typedef struct SRpcMsg {
SRpcHandleInfo info;
} SRpcMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
......@@ -84,12 +86,15 @@ typedef struct SRpcInit {
// the following is for client app ecurity only
char *user; // user name
// call back to process incoming msg, code shall be ignored by server app
// call back to process incoming msg
RpcCfp cfp;
// user defined retry func
// retry not not for particular msg
RpcRfp rfp;
// set up timeout for particular msg
RpcTfp tfp;
void *parent;
} SRpcInit;
......
......@@ -46,6 +46,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018)
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019)
//common & util
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013)
......
......@@ -25,9 +25,9 @@ extern "C" {
// reference counting
typedef void (*_ref_fn_t)(const void *pObj);
#define T_REF_DECLARE() \
struct { \
int32_t val; \
#define T_REF_DECLARE() \
struct { \
volatile int32_t val; \
} _ref;
#define T_REF_REGISTER_FUNC(s, e) \
......
......@@ -60,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
}
static void deregisterRequest(SRequestObj *pRequest) {
const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable
const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable
assert(pRequest != NULL);
STscObj *pTscObj = pRequest->pTscObj;
......@@ -77,13 +77,13 @@ static void deregisterRequest(SRequestObj *pRequest) {
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
}
if (duration >= SLOW_QUERY_INTERVAL) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
}
releaseTscObj(pTscObj->id);
}
......@@ -109,6 +109,14 @@ static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
}
}
// start timer for particular msgType
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
return true;
}
return false;
}
// TODO refactor
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit;
......@@ -118,6 +126,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer;
rpcInit.rfp = clientRpcRfp;
rpcInit.tfp = clientRpcTfp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;
......@@ -375,7 +384,7 @@ void taos_init_imp(void) {
initQueryModuleMsgHandle();
taosConvInit();
rpcInit();
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
......
......@@ -1308,8 +1308,8 @@ int32_t doProcessMsgFromServer(void* param) {
char tbuf[40] = {0};
TRACE_TO_STR(trace, tbuf);
tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), tbuf);
tscDebug("processMsgFromServer handle %p, message: %s, size:%d, code: %s, gtid: %s", pMsg->info.handle,
TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code), tbuf);
if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
......@@ -1922,7 +1922,7 @@ _OVER:
return code;
}
int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
int32_t acctId, char* db) {
SName name;
......@@ -1957,20 +1957,33 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i
return -1;
}
taosArrayPush(pList, &name);
char dbFName[TSDB_DB_FNAME_LEN];
sprintf(dbFName, "%d.%.*s", acctId, dbLen, dbName);
STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
if (pDb) {
taosArrayPush(pDb->pTables, &name);
} else {
STablesReq db;
db.pTables = taosArrayInit(20, sizeof(SName));
strcpy(db.dbFName, dbFName);
taosArrayPush(db.pTables, &name);
taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db));
}
return TSDB_CODE_SUCCESS;
}
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
*pReq = taosArrayInit(10, sizeof(SName));
if (NULL == *pReq) {
SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pHash) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
bool inEscape = false;
int32_t code = 0;
void *pIter = NULL;
int32_t vIdx = 0;
int32_t vPos[2];
......@@ -1985,7 +1998,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
vLen[vIdx] = i - vPos[vIdx];
}
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
if (code) {
goto _return;
}
......@@ -2035,7 +2048,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
vLen[vIdx] = i - vPos[vIdx];
}
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
if (code) {
goto _return;
}
......@@ -2067,14 +2080,31 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
goto _return;
}
int32_t dbNum = taosHashGetSize(pHash);
*pReq = taosArrayInit(dbNum, sizeof(STablesReq));
pIter = taosHashIterate(pHash, NULL);
while (pIter) {
STablesReq* pDb = (STablesReq*)pIter;
taosArrayPush(*pReq, pDb);
pIter = taosHashIterate(pHash, pIter);
}
taosHashCleanup(pHash);
return TSDB_CODE_SUCCESS;
_return:
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
taosArrayDestroy(*pReq);
*pReq = NULL;
pIter = taosHashIterate(pHash, NULL);
while (pIter) {
STablesReq* pDb = (STablesReq*)pIter;
taosArrayDestroy(pDb->pTables);
pIter = taosHashIterate(pHash, pIter);
}
taosHashCleanup(pHash);
return terrno;
}
......
......@@ -308,9 +308,9 @@ static const SSysDbTableSchema offsetSchema[] = {
};
static const SSysDbTableSchema querySchema[] = {
{.name = "query_id", .bytes = TSDB_QUERY_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "kill_id", .bytes = TSDB_QUERY_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "query_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "conn_id", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
......@@ -354,11 +354,19 @@ static const SSysTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)}};
void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size) {
*pInfosTableMeta = infosMeta;
*size = tListLen(infosMeta);
if (pInfosTableMeta) {
*pInfosTableMeta = infosMeta;
}
if (size) {
*size = tListLen(infosMeta);
}
}
void getPerfDbMeta(const SSysTableMeta** pPerfsTableMeta, size_t* size) {
*pPerfsTableMeta = perfsMeta;
*size = tListLen(perfsMeta);
if (pPerfsTableMeta) {
*pPerfsTableMeta = perfsMeta;
}
if (size) {
*size = tListLen(perfsMeta);
}
}
......@@ -1691,13 +1691,17 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
if (!pShow->sysDbRsp) {
SDbObj infoschemaDb = {0};
setInformationSchemaDbCfg(&infoschemaDb);
dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, 14, true, 0, 1);
size_t numOfTables = 0;
getInfosDbMeta(NULL, &numOfTables);
dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
numOfRows += 1;
SDbObj perfschemaDb = {0};
setPerfSchemaDbCfg(&perfschemaDb);
dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, 3, true, 0, 1);
numOfTables = 0;
getPerfDbMeta(NULL, &numOfTables);
dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
numOfRows += 1;
pShow->sysDbRsp = true;
......
......@@ -84,6 +84,9 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
}
for (int32_t i = 0; i < msgNum; ++i) {
req.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgType);
......@@ -111,6 +114,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
} else {
rsp.rspCode = 0;
}
rsp.msgIdx = req.msgIdx;
rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.info.rspLen;
rsp.msg = reqMsg.info.rsp;
......@@ -136,6 +140,8 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
......
......@@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
ASSERT(smaObj.uid != 0);
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name);
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
smaObj.stbUid = pStb->uid;
......@@ -603,6 +603,9 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mDebug("mndSma: create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d",
pCreate->name, smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
code = 0;
_OVER:
......
......@@ -115,24 +115,29 @@ struct SSmaStat {
#define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem {
void *taskInfo; // qTaskInfo_t
int64_t refId;
tmr_h tmrId;
int32_t maxDelay;
int8_t level;
int8_t triggerStat;
int32_t maxDelay;
tmr_h tmrId;
};
struct SRSmaInfo {
STSchema *pTSchema;
int64_t suid;
int64_t refId; // refId of SRSmaStat
int8_t delFlag;
T_REF_DECLARE()
SRSmaInfoItem items[TSDB_RETENTION_L2];
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable
};
#define RSMA_INFO_HEAD_LEN 24
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
#define RSMA_INFO_HEAD_LEN 32
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
#define RSMA_INFO_ITEM(r, i) (&(r)->items[i])
enum {
TASK_TRIGGER_STAT_INIT = 0,
......@@ -168,8 +173,8 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
......@@ -223,12 +228,11 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc);
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc);
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
......
......@@ -381,6 +381,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
metaReaderClear(&mr);
return -1;
} else if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
terrno = TSDB_CODE_SUCCESS;
}
metaReaderClear(&mr);
......
......@@ -308,12 +308,12 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
* @return int32_t
*/
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: set rsma stat
......@@ -337,18 +337,26 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
// step 3: swap rsmaInfoHash and iRsmaInfoHash
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat));
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat));
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) =
taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) {
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
}
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 4: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
......@@ -383,26 +391,52 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
* @return int32_t
*/
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: merge rsmaInfoHash and iRsmaInfoHash
taosWLockLatch(SMA_ENV_LOCK(pSmaEnv));
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
#if 0
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
// TODO: optimization - just switch the hash pointer if rsmaInfoHash is empty
}
// just switch the hash pointer if rsmaInfoHash is empty
if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) {
SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat);
RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash;
}
} else {
#endif
#if 1
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
smaDebug(
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
"table:%" PRIi64,
SMA_VID(pSma), *pSuid);
} else {
smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
"table:%" PRIi64,
SMA_VID(pSma), refVal, *pSuid);
}
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
continue;
}
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
......@@ -416,11 +450,14 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
}
#endif
// }
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL;
taosWUnLockLatch(SMA_ENV_LOCK(pSmaEnv));
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 2: cleanup outdated qtaskinfo files
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
......
......@@ -17,7 +17,7 @@
typedef struct SSmaStat SSmaStat;
#define SMA_MGMT_REF_NUM 10240
#define SMA_MGMT_REF_NUM 10240
extern SSmaMgmt smaMgmt;
......@@ -171,7 +171,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
if (!pRSmaInfo) return 0;
int ref = T_REF_INC(pRSmaInfo);
smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
return 0;
......@@ -183,9 +183,6 @@ int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int ref = T_REF_DEC(pRSmaInfo);
smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
if (ref == 0) {
tdRemoveRSmaInfoBySuid(pSma, pRSmaInfo->suid);
}
return 0;
}
......
......@@ -116,8 +116,10 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
}
// create stable to save tsma result in dstVgId
SName stbFullName = {0};
tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVCreateStbReq pReq = {0};
pReq.name = pCfg->dstTbName;
pReq.name = (char*)tNameGetTableName(&stbFullName);
pReq.suid = pCfg->dstTbUid;
pReq.schemaRow = pCfg->schemaRow;
pReq.schemaTag = pCfg->schemaTag;
......@@ -125,6 +127,12 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
return -1;
}
smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
" dstTb:%s dstVg:%d",
SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
} else {
ASSERT(0);
}
return 0;
......
......@@ -287,22 +287,22 @@ int32_t tdRemoveTFile(STFile *pTFile) {
}
// smaXXXUtil ================
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) {
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
void *pResult = taosAcquireRef(rsetId, refId);
if (!pResult) {
smaWarn("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr());
smaWarn("rsma acquire ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr());
} else {
smaDebug("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId);
smaDebug("rsma acquire ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId);
}
return pResult;
}
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) {
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
if (taosReleaseRef(rsetId, refId) < 0) {
smaWarn("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr());
smaWarn("rsma release ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr());
return TSDB_CODE_FAILED;
}
smaDebug("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId);
smaDebug("rsma release ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId);
return TSDB_CODE_SUCCESS;
}
......@@ -313,7 +313,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
char *pOutput = NULL;
int32_t len = 0;
if (qSerializeTaskStatus(srcTaskInfo, &pOutput, &len) < 0) {
if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
......@@ -337,41 +337,34 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
goto _err;
}
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
taosMemoryFreeClear(pOutput);
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(pOutput);
tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
return TSDB_CODE_FAILED;
}
/**
* @brief pTSchema is shared
*
* @param pSma
* @param pDest
* @param pSrc
* @return int32_t
*
* @param pSma
* @param pDest
* @param pSrc
* @return int32_t
*/
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) {
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) {
SVnode *pVnode = pSma->pVnode;
SRSmaParam *param = NULL;
if (!pSrc) {
*pDest = NULL;
return TSDB_CODE_SUCCESS;
}
if (!pDest) {
pDest = taosMemoryCalloc(1, sizeof(SRSmaInfo));
if (!pDest) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
}
memcpy(pDest, pSrc, sizeof(SRSmaInfo));
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid);
......@@ -384,21 +377,22 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) {
ASSERT(mr.me.uid == pSrc->suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pSrc->items[i];
if (pItem->taskInfo) {
tdCloneQTaskInfo(pSma, pDest->items[i].taskInfo, pItem->taskInfo, param, pSrc->suid, i);
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) {
goto _err;
}
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid);
}
metaReaderClear(&mr);
*pDest = pSrc; // pointer copy
return TSDB_CODE_SUCCESS;
_err:
*pDest = NULL;
metaReaderClear(&mr);
tdFreeRSmaInfo(pSma, pDest, false);
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr());
return TSDB_CODE_FAILED;
}
// ...
\ No newline at end of file
}
\ No newline at end of file
......@@ -223,6 +223,8 @@ int vnodeCommit(SVnode *pVnode) {
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
pVnode->state.commitTerm = pVnode->state.applyTerm;
// save info
info.config = pVnode->config;
info.state.committed = pVnode->state.applied;
......
......@@ -273,6 +273,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
}
for (int32_t i = 0; i < msgNum; ++i) {
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgType);
......@@ -301,6 +304,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
break;
}
rsp.msgIdx = req.msgIdx;
rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.contLen;
rsp.rspCode = reqMsg.code;
......@@ -327,6 +331,8 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
*(int32_t *)((char *)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
......
......@@ -351,7 +351,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
blockDebugShowDataBlocks(data, __func__);
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
}
......@@ -852,6 +851,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
submitBlkRsp.code = terrno;
pRsp->code = terrno;
tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit;
......
......@@ -32,6 +32,7 @@ extern "C" {
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
#define CTG_DEFAULT_BATCH_NUM 64
#define CTG_DEFAULT_FETCH_NUM 8
#define CTG_RENT_SLOT_SECOND 1.5
......@@ -80,6 +81,8 @@ typedef enum {
CTG_TASK_GET_UDF,
CTG_TASK_GET_USER,
CTG_TASK_GET_SVR_VER,
CTG_TASK_GET_TB_META_BATCH,
CTG_TASK_GET_TB_HASH_BATCH,
} CTG_TASK_TYPE;
typedef enum {
......@@ -110,6 +113,23 @@ typedef struct SCtgTbMetaCtx {
int32_t flag;
} SCtgTbMetaCtx;
typedef struct SCtgFetch {
int32_t dbIdx;
int32_t tbIdx;
int32_t fetchIdx;
int32_t resIdx;
int32_t flag;
SCtgTbCacheInfo tbInfo;
int32_t vgId;
} SCtgFetch;
typedef struct SCtgTbMetasCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
} SCtgTbMetasCtx;
typedef struct SCtgTbIndexCtx {
SName* pName;
} SCtgTbIndexCtx;
......@@ -137,6 +157,14 @@ typedef struct SCtgTbHashCtx {
SName* pName;
} SCtgTbHashCtx;
typedef struct SCtgTbHashsCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
} SCtgTbHashsCtx;
typedef struct SCtgIndexCtx {
char indexFName[TSDB_INDEX_FNAME_LEN];
} SCtgIndexCtx;
......@@ -211,6 +239,7 @@ typedef struct SCtgBatch {
SRequestConnInfo conn;
char dbFName[TSDB_DB_FNAME_LEN];
SArray* pTaskIds;
SArray* pMsgIdxs;
} SCtgBatch;
typedef struct SCtgJob {
......@@ -218,6 +247,7 @@ typedef struct SCtgJob {
int32_t batchId;
SHashObj* pBatchs;
SArray* pTasks;
int32_t subTaskNum;
int32_t taskDone;
SMetaData jobRes;
int32_t jobResCode;
......@@ -258,6 +288,7 @@ typedef struct SCtgTaskCallbackParam {
SArray* taskId;
int32_t reqType;
int32_t batchId;
SArray* msgIdx;
} SCtgTaskCallbackParam;
......@@ -276,6 +307,7 @@ typedef struct SCtgTask {
int32_t taskId;
SCtgJob* pJob;
void* taskCtx;
SArray* msgCtxs;
SCtgMsgCtx msgCtx;
int32_t code;
void* res;
......@@ -286,9 +318,14 @@ typedef struct SCtgTask {
SHashObj* pBatchs;
} SCtgTask;
typedef struct SCtgTaskReq {
SCtgTask* pTask;
int32_t msgIdx;
} SCtgTaskReq;
typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*);
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTask*, int32_t, const SDataBuf *, int32_t);
typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTaskReq*, int32_t, const SDataBuf *, int32_t);
typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*);
typedef int32_t (*ctgCloneTaskResFp)(SCtgTask*, void**);
typedef int32_t (*ctgCompTaskFp)(SCtgTask*, void*, bool*);
......@@ -487,6 +524,8 @@ typedef struct SCtgOperation {
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_GET_TASK_MSGCTX(_task, _id) (((CTG_TASK_GET_TB_META_BATCH == (_task)->type) || (CTG_TASK_GET_TB_HASH_BATCH == (_task)->type)) ? taosArrayGet((_task)->msgCtxs, (_id)) : &(_task)->msgCtx)
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
......@@ -586,6 +625,7 @@ int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList);
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
......@@ -631,7 +671,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask);
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTaskReq* tReq);
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
......@@ -639,9 +679,9 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
......@@ -664,6 +704,7 @@ void ctgFreeJob(void* job);
void ctgFreeHandleImpl(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update);
void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache *dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
......@@ -672,8 +713,11 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target);
char * ctgTaskTypeStr(CTG_TASK_TYPE type);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId);
int32_t ctgGetTablesReqNum(SArray *pList);
int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fetchIdx, int32_t resIdx, int32_t flag);
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
void ctgFreeSTableIndex(void *info);
void ctgClearSubTaskRes(SCtgSubRes *pRes);
......@@ -682,6 +726,7 @@ void ctgClearHandle(SCatalog* pCtg);
void ctgFreeTbCacheImpl(SCtgTbCache *pCache);
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup);
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
extern SCatalogMgmt gCtgMgmt;
......
此差异已折叠。
......@@ -242,7 +242,6 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S
goto _return;
}
int32_t sz = 0;
pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
......@@ -282,7 +281,6 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid,
goto _return;
}
int32_t sz = 0;
char* stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
......@@ -2152,6 +2150,254 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet
return TSDB_CODE_SUCCESS;
}
#if 0
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
int32_t tbNum = taosArrayGetSize(ctx->pNames);
SName* fName = taosArrayGet(ctx->pNames, 0);
int32_t fIdx = 0;
for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(ctx->pNames, i);
SCtgTbMetaCtx nctx = {0};
nctx.flag = CTG_FLAG_UNKNOWN_STB;
nctx.pName = pName;
if (IS_SYS_DBNAME(pName->dbname)) {
CTG_FLAG_SET_SYS_DB(nctx.flag);
}
STableMeta *pTableMeta = NULL;
CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
SMetaRes res = {0};
if (pTableMeta) {
if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
res.pRes = pTableMeta;
} else {
taosMemoryFreeClear(pTableMeta);
}
}
if (NULL == res.pRes) {
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
}
SCtgFetch fetch = {0};
fetch.tbIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = nctx.flag;
taosArrayPush(ctx->pFetchs, &fetch);
}
taosArrayPush(ctx->pResList, &res);
}
if (NULL == ctx->pFetchs) {
TSWAP(*pResList, ctx->pResList);
}
return TSDB_CODE_SUCCESS;
}
#endif
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList) {
int32_t tbNum = taosArrayGetSize(pList);
SName* pName = taosArrayGet(pList, 0);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB;
uint64_t lastSuid = 0;
STableMeta* lastTableMeta = NULL;
if (IS_SYS_DBNAME(pName->dbname)) {
CTG_FLAG_SET_SYS_DB(flag);
strcpy(dbFName, pName->dbname);
} else {
tNameGetFullDbName(pName, dbFName);
}
SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
for (int32_t i = 0; i < tbNum; ++i) {
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
}
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(pList, i);
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
continue;
}
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
continue;
}
STableMeta* tbMeta = pCache->pMeta;
SCtgTbMetaCtx nctx = {0};
nctx.flag = flag;
nctx.tbInfo.inCache = true;
nctx.tbInfo.dbId = dbCache->dbId;
nctx.tbInfo.suid = tbMeta->suid;
nctx.tbInfo.tbType = tbMeta->tableType;
SMetaRes res = {0};
STableMeta* pTableMeta = NULL;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
int32_t metaSize = CTG_META_SIZE(tbMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
continue;
}
// PROCESS FOR CHILD TABLE
if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
cloneTableMeta(lastTableMeta, &pTableMeta);
memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
continue;
}
int32_t metaSize = sizeof(SCTableMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s",
pName->tname, nctx.tbInfo.tbType, dbFName);
char* stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
}
pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
if (NULL == pCache) {
ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
taosHashRelease(dbCache->stbCache, stName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
}
taosHashRelease(dbCache->stbCache, stName);
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
}
STableMeta* stbMeta = pCache->pMeta;
if (stbMeta->suid != nctx.tbInfo.suid) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, nctx.tbInfo.suid);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
}
metaSize = CTG_META_SIZE(stbMeta);
pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
lastSuid = pTableMeta->suid;
lastTableMeta = pTableMeta;
}
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
int32_t code = 0;
......
此差异已折叠。
......@@ -26,6 +26,7 @@ void ctgFreeMsgSendParam(void* param) {
SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param;
taosArrayDestroy(pParam->taskId);
taosArrayDestroy(pParam->msgIdx);
taosMemoryFree(param);
}
......@@ -88,6 +89,10 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
return "[get user]";
case CTG_TASK_GET_SVR_VER:
return "[get svr ver]";
case CTG_TASK_GET_TB_META_BATCH:
return "[bget table meta]";
case CTG_TASK_GET_TB_HASH_BATCH:
return "[bget table hash]";
default:
return "unknown";
}
......@@ -460,6 +465,25 @@ void ctgResetTbMetaTask(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->res);
}
void ctgFreeBatchMeta(void* meta) {
if (NULL == meta) {
return;
}
SMetaRes* pRes = (SMetaRes*)meta;
taosMemoryFreeClear(pRes->pRes);
}
void ctgFreeBatchHash(void* hash) {
if (NULL == hash) {
return;
}
SMetaRes* pRes = (SMetaRes*)hash;
taosMemoryFreeClear(pRes->pRes);
}
void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
switch (type) {
case CTG_TASK_GET_QNODE:
......@@ -500,6 +524,24 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_TB_META_BATCH: {
SArray* pArray = (SArray*)*pRes;
int32_t num = taosArrayGetSize(pArray);
for (int32_t i = 0; i < num; ++i) {
ctgFreeBatchMeta(taosArrayGet(pArray, i));
}
*pRes = NULL; // no need to free it
break;
}
case CTG_TASK_GET_TB_HASH_BATCH: {
SArray* pArray = (SArray*)*pRes;
int32_t num = taosArrayGetSize(pArray);
for (int32_t i = 0; i < num; ++i) {
ctgFreeBatchHash(taosArrayGet(pArray, i));
}
*pRes = NULL; // no need to free it
break;
}
default:
qError("invalid task type %d", type);
break;
......@@ -554,6 +596,16 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_TB_META_BATCH: {
taosArrayDestroyEx(*pRes, ctgFreeBatchMeta);
*pRes = NULL;
break;
}
case CTG_TASK_GET_TB_HASH_BATCH: {
taosArrayDestroyEx(*pRes, ctgFreeBatchHash);
*pRes = NULL;
break;
}
default:
qError("invalid task type %d", type);
break;
......@@ -583,12 +635,38 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_META_BATCH: {
SCtgTbMetasCtx* taskCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchMeta);
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx);
if (pTask->msgCtx.lastOut) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
pTask->msgCtx.lastOut = NULL;
}
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_HASH: {
SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_HASH_BATCH: {
SCtgTbHashsCtx* taskCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchHash);
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx);
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_INDEX: {
SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
......@@ -679,6 +757,23 @@ int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* targ
return TSDB_CODE_SUCCESS;
}
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target) {
SCtgMsgCtx ctx = {0};
ctx.reqType = reqType;
ctx.out = out;
if (target) {
ctx.target = strdup(target);
if (NULL == ctx.target) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
taosArrayPush(pCtxs, &ctx);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
switch (hashMethod) {
......@@ -780,6 +875,104 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
CTG_RET(code);
}
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) {
int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SMetaRes res = {0};
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
if (vgNum <= 0) {
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
tableNameHashFp fp = NULL;
SVgroupInfo *vgInfo = NULL;
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
int32_t tbNum = taosArrayGetSize(pNames);
if (1 == vgNum) {
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
for (int32_t i = 0; i < tbNum; ++i) {
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == vgInfo) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
*vgInfo = *(SVgroupInfo*)pIter;
ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps,
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
if (update) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, tReq->msgIdx);
SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
pRes->pRes = vgInfo;
} else {
res.pRes = vgInfo;
taosArrayPush(pCtx->pResList, &res);
}
}
return TSDB_CODE_SUCCESS;
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFullName, "%s.", dbFName);
int32_t offset = strlen(tbFullName);
SName* pName = NULL;
int32_t tbNameLen = 0;
for (int32_t i = 0; i < tbNum; ++i) {
pName = taosArrayGet(pNames, i);
tbNameLen = offset + strlen(pName->tname);
strcpy(tbFullName + offset, pName->tname);
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)tbNameLen);
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
taosHashCancelIterate(dbInfo->vgHash, pIter);
break;
}
pIter = taosHashIterate(dbInfo->vgHash, pIter);
vgInfo = NULL;
}
if (NULL == vgInfo) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == pNewVg) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
*pNewVg = *vgInfo;
ctgDebug("Got tb %s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId, vgInfo->epSet.numOfEps,
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
if (update) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, tReq->msgIdx);
SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
pRes->pRes = pNewVg;
} else {
res.pRes = pNewVg;
taosArrayPush(pCtx->pResList, &res);
}
}
CTG_RET(code);
}
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableVersion*)key2)->suid) {
return -1;
......@@ -921,4 +1114,41 @@ int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, cha
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTablesReqNum(SArray *pList) {
if (NULL == pList) {
return 0;
}
int32_t total = 0;
int32_t n = taosArrayGetSize(pList);
for (int32_t i = 0; i < n; ++i) {
STablesReq *pReq = taosArrayGet(pList, i);
total += taosArrayGetSize(pReq->pTables);
}
return total;
}
int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fetchIdx, int32_t resIdx, int32_t flag) {
if (NULL == (*pFetchs)) {
*pFetchs = taosArrayInit(CTG_DEFAULT_FETCH_NUM, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.dbIdx = dbIdx;
fetch.tbIdx = tbIdx;
fetch.fetchIdx = (*fetchIdx)++;
fetch.resIdx = resIdx;
fetch.flag = flag;
taosArrayPush(*pFetchs, &fetch);
return TSDB_CODE_SUCCESS;
}
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) {
STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx);
return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx);
}
......@@ -737,6 +737,9 @@ typedef struct STimeSliceOperatorInfo {
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pNextRow; // SArray<SGroupValue>
bool isPrevRowSet;
bool isNextRowSet;
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
......@@ -987,7 +990,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model);
char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
......
......@@ -341,7 +341,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
assert(pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
......
......@@ -1437,7 +1437,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
pAggInfo->groupId = groupId;
}
static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
bool returnNotNull = false;
for (int32_t j = 0; j < numOfExprs; ++j) {
struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
if (!isRowEntryInitialized(pResInfo)) {
......@@ -1447,6 +1448,15 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes;
}
if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
returnNotNull = true;
}
}
// if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
// except for first/last, which require not null output, output no rows
if (pRow->numOfRows == 0 && !returnNotNull) {
pRow->numOfRows = 1;
}
}
......@@ -1458,7 +1468,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowCellOffset);
if (pRow->numOfRows == 0) {
releaseBufPage(pBuf, page);
return 0;
......@@ -1514,7 +1524,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowCellOffset);
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
releaseBufPage(pBuf, page);
......@@ -4492,7 +4502,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model) {
char* sql, EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -4503,6 +4513,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
(*pTaskInfo)->sql = sql;
sql = NULL;
(*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
......@@ -4515,6 +4526,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
return code;
_complete:
taosMemoryFree(sql);
doDestroyTask(*pTaskInfo);
terrno = code;
return code;
......
......@@ -2065,10 +2065,30 @@ static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
memcpy(pkey->pData, val, pkey->bytes);
}
}
pSliceInfo->isPrevRowSet = true;
}
static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
// null data should not be kept since it can not be used to perform interpolation
if (!colDataIsNull_s(pColInfoData, i)) {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i);
pkey->isNull = false;
char* val = colDataGetData(pColInfoData, rowIndex);
memcpy(pkey->pData, val, pkey->bytes);
}
}
pSliceInfo->isNextRowSet = true;
}
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock,
int32_t rowIndex, SSDataBlock* pResBlock) {
SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows;
// todo set the correct primary timestamp column
......@@ -2144,6 +2164,10 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
case TSDB_FILL_PREV: {
if (!pSliceInfo->isPrevRowSet) {
break;
}
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
colDataAppend(pDst, rows, pkey->pData, false);
pResBlock->info.rows += 1;
......@@ -2151,8 +2175,12 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
}
case TSDB_FILL_NEXT: {
char* p = colDataGetData(pSrc, rowIndex);
colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex));
if (!pSliceInfo->isNextRowSet) {
break;
}
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
colDataAppend(pDst, rows, pkey->pData, false);
pResBlock->info.rows += 1;
break;
}
......@@ -2186,6 +2214,35 @@ static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
taosArrayPush(pInfo->pPrevRow, &key);
}
pInfo->isPrevRowSet = false;
return TSDB_CODE_SUCCESS;
}
static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
if (pInfo->pNextRow != NULL) {
return TSDB_CODE_SUCCESS;
}
pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys));
if (pInfo->pNextRow == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SGroupKeys key = {0};
key.bytes = pColInfo->info.bytes;
key.type = pColInfo->info.type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
taosArrayPush(pInfo->pNextRow, &key);
}
pInfo->isNextRowSet = false;
return TSDB_CODE_SUCCESS;
}
......@@ -2215,14 +2272,19 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock);
//int32_t numOfRows = 0;
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
}
int32_t code = initPrevRowsKeeper(pSliceInfo, pBlock);
int32_t code;
code = initPrevRowsKeeper(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
code = initNextRowsKeeper(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
......@@ -2244,7 +2306,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
char* v = colDataGetData(pSrc, i);
//colDataAppend(pDst, numOfRows, v, false);
colDataAppend(pDst, pResBlock->info.rows, v, false);
}
......@@ -2262,11 +2323,16 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break;
}
} else if (ts < pSliceInfo->current) {
// in case interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows(pSliceInfo, pBlock, i);
if (i < pBlock->info.rows - 1) {
// in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i + 1);
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock);
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
......@@ -2285,8 +2351,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // ts > pSliceInfo->current
// in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock);
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
......@@ -2326,9 +2395,10 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
}
//check if need to interpolate after ts range
while (pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pBlock->info.rows - 1, pResBlock);
// check if need to interpolate after ts range
// except for fill(next)
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
......
......@@ -64,6 +64,9 @@ typedef struct SMinmaxResInfo {
bool assign; // assign the first value or not
int64_t v;
STuplePos tuplePos;
STuplePos nullTuplePos;
bool nullTupleSaved;
} SMinmaxResInfo;
typedef struct STopBotResItem {
......@@ -75,6 +78,10 @@ typedef struct STopBotResItem {
typedef struct STopBotRes {
int32_t maxSize;
int16_t type;
STuplePos nullTuplePos;
bool nullTupleSaved;
STopBotResItem* pItems;
} STopBotRes;
......@@ -221,6 +228,10 @@ typedef struct SSampleInfo {
int32_t numSampled;
uint8_t colType;
int16_t colBytes;
STuplePos nullTuplePos;
bool nullTupleSaved;
char* data;
STuplePos* tuplePos;
} SSampleInfo;
......@@ -1134,6 +1145,9 @@ bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo)
SMinmaxResInfo* buf = GET_ROWCELL_INTERBUF(pResultInfo);
buf->assign = false;
buf->tuplePos.pageId = -1;
buf->nullTupleSaved = false;
buf->nullTuplePos.pageId = -1;
return true;
}
......@@ -1575,6 +1589,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
}
_min_max_over:
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved ) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos);
pBuf->nullTupleSaved = true;
}
return numOfElems;
}
......@@ -1615,7 +1633,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if (pEntryInfo->numOfRes > 0) {
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
} else {
setNullSelectivityValue(pCtx, pBlock, currentRow);
setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
}
return pEntryInfo->numOfRes;
......@@ -3366,6 +3384,8 @@ bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
pRes->maxSize = pCtx->param[1].param.i;
pRes->nullTupleSaved = false;
pRes->nullTuplePos.pageId = -1;
return true;
}
......@@ -3403,6 +3423,10 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
}
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
pRes->nullTupleSaved = true;
}
return TSDB_CODE_SUCCESS;
}
......@@ -3427,6 +3451,11 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
}
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
pRes->nullTupleSaved = true;
}
return TSDB_CODE_SUCCESS;
}
......@@ -3625,6 +3654,11 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
// todo assign the tag value and the corresponding row data
int32_t currentRow = pBlock->info.rows;
if (pEntryInfo->numOfRes <= 0) {
colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
return pEntryInfo->numOfRes;
}
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STopBotResItem* pItem = &pRes->pItems[i];
if (type == TSDB_DATA_TYPE_FLOAT) {
......@@ -4897,7 +4931,8 @@ bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo)
pInfo->numSampled = 0;
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
pInfo->nullTuplePos.pageId = -1;
pInfo->nullTupleSaved = false;
pInfo->data = (char*)pInfo + sizeof(SSampleInfo);
pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes);
......@@ -4943,6 +4978,11 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
doReservoirSample(pCtx, pInfo, data, i);
}
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos);
pInfo->nullTupleSaved = true;
}
SET_VAL(pResInfo, pInfo->numSampled, pInfo->numSampled);
return TSDB_CODE_SUCCESS;
}
......@@ -4957,6 +4997,11 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t currentRow = pBlock->info.rows;
if (pInfo->numSampled == 0) {
colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow);
return pInfo->numSampled;
}
for (int32_t i = 0; i < pInfo->numSampled; ++i) {
colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false);
setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i);
......
......@@ -221,6 +221,18 @@ bool fmIsLastRowFunc(int32_t funcId) {
return FUNCTION_TYPE_LAST_ROW == funcMgtBuiltins[funcId].type;
}
bool fmIsNotNullOutputFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
}
return FUNCTION_TYPE_LAST == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_LAST_PARTIAL == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_LAST_MERGE == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_FIRST == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_FIRST_PARTIAL == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_FIRST_MERGE == funcMgtBuiltins[funcId].type;
}
bool fmIsSelectValueFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
......
......@@ -39,6 +39,11 @@ typedef struct SMsgBuf {
char* buf;
} SMsgBuf;
typedef struct SParseTablesMetaReq {
char dbFName[TSDB_DB_FNAME_LEN];
SHashObj* pTables;
} SParseTablesMetaReq;
typedef struct SParseMetaCache {
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
......@@ -95,7 +100,7 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
void destoryParseMetaCache(SParseMetaCache* pMetaCache);
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
#ifdef __cplusplus
}
......
......@@ -474,6 +474,24 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) {
return TSDB_CODE_SUCCESS;
}
static int32_t buildTableReqFromDb(SHashObj* pDbsHash, SArray** pDbs) {
if (NULL != pDbsHash) {
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SParseTablesMetaReq* p = taosHashIterate(pDbsHash, NULL);
while (NULL != p) {
STablesReq req = {0};
strcpy(req.dbFName, p->dbFName);
buildTableReq(p->pTables, &req.pTables);
taosArrayPush(*pDbs, &req);
p = taosHashIterate(pDbsHash, p);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) {
if (NULL != pUserAuthHash) {
*pUserAuth = taosArrayInit(taosHashGetSize(pUserAuthHash), sizeof(SUserAuthInfo));
......@@ -513,12 +531,12 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
}
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
int32_t code = buildTableReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReq(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash);
code = buildTableReqFromDb(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildDbReq(pMetaCache->pDbCfg, &pCatalogReq->pDbCfg);
......@@ -587,6 +605,24 @@ static int32_t putDbDataToCache(const SArray* pDbReq, const SArray* pDbData, SHa
return TSDB_CODE_SUCCESS;
}
static int32_t putDbTableDataToCache(const SArray* pDbReq, const SArray* pTableData, SHashObj** pTable) {
int32_t ndbs = taosArrayGetSize(pDbReq);
int32_t tableNo = 0;
for (int32_t i = 0; i < ndbs; ++i) {
STablesReq* pReq = taosArrayGet(pDbReq, i);
int32_t ntables = taosArrayGetSize(pReq->pTables);
for (int32_t j = 0; j < ntables; ++j) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(taosArrayGet(pReq->pTables, j), fullName);
if (TSDB_CODE_SUCCESS != putMetaDataToHash(fullName, strlen(fullName), pTableData, tableNo, pTable)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
++tableNo;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUserAuthData, SHashObj** pUserAuth) {
int32_t nvgs = taosArrayGetSize(pUserAuthReq);
for (int32_t i = 0; i < nvgs; ++i) {
......@@ -612,12 +648,12 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas
}
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t code = putTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup);
}
if (TSDB_CODE_SUCCESS == code) {
code = putTableDataToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, &pMetaCache->pTableVgroup);
code = putDbTableDataToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, &pMetaCache->pTableVgroup);
}
if (TSDB_CODE_SUCCESS == code) {
code = putDbDataToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, &pMetaCache->pDbCfg);
......@@ -657,14 +693,38 @@ static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const cha
return reserveTableReqInCacheImpl(fullName, len, pTables);
}
static int32_t reserveTableReqInDbCacheImpl(int32_t acctId, const char* pDb, const char* pTable, SHashObj* pDbs) {
SParseTablesMetaReq req = {0};
int32_t len = snprintf(req.dbFName, sizeof(req.dbFName), "%d.%s", acctId, pDb);
int32_t code = reserveTableReqInCache(acctId, pDb, pTable, &req.pTables);
if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pDbs, req.dbFName, len, &req, sizeof(SParseTablesMetaReq));
}
return code;
}
static int32_t reserveTableReqInDbCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pDbs) {
if (NULL == *pDbs) {
*pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
char fullName[TSDB_DB_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s", acctId, pDb);
SParseTablesMetaReq* pReq = taosHashGet(*pDbs, fullName, len);
if (NULL == pReq) {
return reserveTableReqInDbCacheImpl(acctId, pDb, pTable, *pDbs);
}
return reserveTableReqInCache(acctId, pDb, pTable, &pReq->pTables);
}
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableMeta);
return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pTableMeta);
}
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableMeta);
return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableMeta);
}
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
......@@ -711,13 +771,11 @@ int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName,
}
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup);
return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup);
}
int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableVgroup);
return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableVgroup);
}
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
......@@ -919,10 +977,24 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
return TSDB_CODE_SUCCESS;
}
void destoryParseMetaCache(SParseMetaCache* pMetaCache) {
taosHashCleanup(pMetaCache->pTableMeta);
void destoryParseTablesMetaReqHash(SHashObj* pHash) {
SParseTablesMetaReq* p = taosHashIterate(pHash, NULL);
while (NULL != p) {
taosHashCleanup(p->pTables);
p = taosHashIterate(pHash, p);
}
taosHashCleanup(pHash);
}
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
if (request) {
destoryParseTablesMetaReqHash(pMetaCache->pTableMeta);
destoryParseTablesMetaReqHash(pMetaCache->pTableVgroup);
} else {
taosHashCleanup(pMetaCache->pTableMeta);
taosHashCleanup(pMetaCache->pTableVgroup);
}
taosHashCleanup(pMetaCache->pDbVgroup);
taosHashCleanup(pMetaCache->pTableVgroup);
taosHashCleanup(pMetaCache->pDbCfg);
taosHashCleanup(pMetaCache->pDbInfo);
taosHashCleanup(pMetaCache->pUserAuth);
......
......@@ -85,13 +85,13 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) {
taosMemoryFreeClear(pVal->datum.p);
}
if (pParam->is_null && 1 == *(pParam->is_null)) {
pVal->node.resType.type = TSDB_DATA_TYPE_NULL;
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
return TSDB_CODE_SUCCESS;
}
int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes);
pVal->node.resType.type = pParam->buffer_type;
pVal->node.resType.bytes = inputSize;
......@@ -187,7 +187,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq(&metaCache, pCatalogReq);
}
destoryParseMetaCache(&metaCache);
destoryParseMetaCache(&metaCache, true);
terrno = code;
return code;
}
......@@ -203,7 +203,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
code = analyseSemantic(pCxt, pQuery, &metaCache);
}
}
destoryParseMetaCache(&metaCache);
destoryParseMetaCache(&metaCache, false);
terrno = code;
return code;
}
......
......@@ -50,6 +50,7 @@ struct MockTableMeta {
class MockCatalogServiceImpl;
class MockCatalogService {
public:
static void destoryTablesReq(void* p);
static void destoryCatalogReq(SCatalogReq* pReq);
static void destoryMetaRes(void* p);
static void destoryMetaArrayRes(void* p);
......
......@@ -462,3 +462,5 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
return TSDB_CODE_SUCCESS;
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -537,6 +537,7 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
pTask->maxExecTimes++;
pTask->maxRetryTimes++;
if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
pTask->timeoutUsec *= 2;
if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册