提交 701c1141 编写于 作者: M Minglei Jin

Merge branch 'v3.0' into fix/TD-18437

# rust-bindings
ExternalProject_Add(rust-bindings
GIT_REPOSITORY https://github.com/songtianyi/tdengine-rust-bindings.git
GIT_TAG 7ed7a97
SOURCE_DIR "${TD_SOURCE_DIR}/examples/rust"
BINARY_DIR "${TD_SOURCE_DIR}/examples/rust"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
......@@ -105,11 +105,6 @@ if(${BUILD_WITH_SQLITE})
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_SQLITE})
# rust-bindings
if(${RUST_BINDINGS})
cat("${TD_SUPPORT_DIR}/rust-bindings_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${RUST_BINDINGS})
# lucene
if(${BUILD_WITH_LUCENE})
cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
......
......@@ -7,12 +7,13 @@ import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import PkgListV3 from "/components/PkgListV3";
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)
您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
为方便使用,标准的服务端安装包包含了 taos、taosd、taosAdapter、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用 Docker 立即体验](../../get-started/docker/)。需要注意的是,rpm 和 deb 包不含 taosdump 和 TDinsight 安装脚本,这些工具需要通过安装 taosTool 包获得。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。需要注意的是,rpm 和 deb 包不含 taosdump 和 TDinsight 安装脚本,这些工具需要通过安装 taosTool 包获得。TDengine 也提供 Windows x64 平台的安装包。
## 安装
......
......@@ -807,300 +807,9 @@ SHOW SUBSCRIPTIONS;
以下是各语言的完整示例代码。
<Tabs defaultValue="java" groupId="lang">
<TabItem label="C" value="c">
```c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static char dbName[64] = "tmqdb";
static char stbName[64] = "stb";
static char topicName[64] = "topicname";
static int32_t msg_process(TAOS_RES* msg) {
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf);
}
return rows;
}
static int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes;
// drop database if exists
printf("create database\n");
pRes = taos_query(pConn, "drop database if exists tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create database
pRes = taos_query(pConn, "create database tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create super table
printf("create super table\n");
pRes = taos_query(
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create sub tables
printf("create sub tables\n");
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// insert data
printf("insert data into sub tables\n");
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
int32_t create_topic() {
printf("create topic\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "use tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
tmq_conf_res_t code;
tmq_conf_t* conf = tmq_conf_new();
code = tmq_conf_set(conf, "enable.auto.commit", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "group.id", "cgrpName");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "client.id", "user defined name");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.user", "root");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "msg.with.table.name", "true");
if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topicList = tmq_list_new();
int32_t code = tmq_list_append(topicList, "topicname");
if (code) {
return NULL;
}
return topicList;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
int32_t code;
if ((code = tmq_subscribe(tmq, topicList))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
return;
}
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
/*} else {*/
/*break;*/
}
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
int main(int argc, char* argv[]) {
int32_t code;
if (init_env() < 0) {
return -1;
}
if (create_topic() < 0) {
return -1;
}
tmq_t* tmq = build_consumer();
if (NULL == tmq) {
fprintf(stderr, "%% build_consumer() fail!\n");
return -1;
}
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
return -1;
}
basic_consume_loop(tmq, topic_list);
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
} else {
fprintf(stderr, "%% unsubscribe\n");
}
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
} else {
fprintf(stderr, "%% Consumer closed\n");
}
return 0;
}
```
[查看源码](https://github.com/taosdata/TDengine/blob/develop/examples/c/tmq.c)
<TabItem label="C" value="c">
<CDemo>
</TabItem>
<TabItem label="Java" value="java">
......@@ -1116,22 +825,7 @@ int main(int argc, char* argv[]) {
</TabItem>
<TabItem label="Python" value="Python">
```python
import taos
from taos.tmq import TaosConsumer
import taos
from taos.tmq import *
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
for msg in consumer:
for row in msg:
print(row)
```
[查看源码](https://github.com/taosdata/TDengine/blob/develop/docs/examples/python/tmq_example.py)
<Python />
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
......
```c
{{#include docs/examples/c/subscribe_demo.c}}
```
\ No newline at end of file
{{#include docs/examples/c/tmq-example.c}}
```
```py
{{#include docs/examples/python/subscribe_demo.py}}
```
\ No newline at end of file
{{#include docs/examples/python/tmq_example.py}}
```
[package]
name = "rust"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
taos = "*"
[dev-dependencies]
chrono = "0.4"
itertools = "0.10.3"
pretty_env_logger = "0.4.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
anyhow = "1"
use anyhow::Result;
use serde::Deserialize;
use taos::*;
#[tokio::main]
async fn main() -> Result<()> {
let taos = TaosBuilder::from_dsn("taos://")?.build()?;
taos.exec_many([
"drop database if exists test",
"create database test keep 36500",
"use test",
"create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,
c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned,
c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t1 varchar(100))",
])
.await?;
let mut stmt = Stmt::init(&taos)?;
stmt.prepare(
"insert into ? using tb1 tags(?) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
let params = vec![
ColumnView::from_millis_timestamp(vec![164000000000]),
ColumnView::from_bools(vec![true]),
ColumnView::from_tiny_ints(vec![i8::MAX]),
ColumnView::from_small_ints(vec![i16::MAX]),
ColumnView::from_ints(vec![i32::MAX]),
ColumnView::from_big_ints(vec![i64::MAX]),
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
ColumnView::from_unsigned_ints(vec![u32::MAX]),
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
ColumnView::from_floats(vec![f32::MAX]),
ColumnView::from_doubles(vec![f64::MAX]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
assert_eq!(rows, 1);
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct Row {
ts: String,
c1: bool,
c2: i8,
c3: i16,
c4: i32,
c5: i64,
c6: u8,
c7: u16,
c8: u32,
c9: u64,
c10: Option<f32>,
c11: f64,
c12: String,
c13: String,
t1: serde_json::Value,
}
let rows: Vec<Row> = taos
.query("select * from tb1")
.await?
.deserialize()
.try_collect()
.await?;
let row = &rows[0];
dbg!(&row);
assert_eq!(row.c5, i64::MAX);
assert_eq!(row.c8, u32::MAX);
assert_eq!(row.c9, u64::MAX);
assert_eq!(row.c10.unwrap(), f32::MAX);
// assert_eq!(row.c11, f64::MAX);
assert_eq!(row.c12, "ABC");
assert_eq!(row.c13, "涛思数据");
Ok(())
}
use anyhow::Result;
use serde::Deserialize;
use taos::*;
#[tokio::main]
async fn main() -> Result<()> {
let taos = TaosBuilder::from_dsn("taos://")?.build()?;
taos.exec_many([
"drop database if exists test_bindable",
"create database test_bindable keep 36500",
"use test_bindable",
"create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,
c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned,
c10 float, c11 double, c12 varchar(100), c13 nchar(100))",
])
.await?;
let mut stmt = Stmt::init(&taos)?;
stmt.prepare("insert into tb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")?;
let params = vec![
ColumnView::from_millis_timestamp(vec![0]),
ColumnView::from_bools(vec![true]),
ColumnView::from_tiny_ints(vec![i8::MAX]),
ColumnView::from_small_ints(vec![i16::MAX]),
ColumnView::from_ints(vec![i32::MAX]),
ColumnView::from_big_ints(vec![i64::MAX]),
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
ColumnView::from_unsigned_ints(vec![u32::MAX]),
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
ColumnView::from_floats(vec![f32::MAX]),
ColumnView::from_doubles(vec![f64::MAX]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
assert_eq!(rows, 1);
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct Row {
ts: String,
c1: bool,
c2: i8,
c3: i16,
c4: i32,
c5: i64,
c6: u8,
c7: u16,
c8: u32,
c9: u64,
c10: Option<f32>,
c11: f64,
c12: String,
c13: String,
}
let rows: Vec<Row> = taos
.query("select * from tb1")
.await?
.deserialize()
.try_collect()
.await?;
let row = &rows[0];
dbg!(&row);
assert_eq!(row.c5, i64::MAX);
assert_eq!(row.c8, u32::MAX);
assert_eq!(row.c9, u64::MAX);
assert_eq!(row.c10.unwrap(), f32::MAX);
// assert_eq!(row.c11, f64::MAX);
assert_eq!(row.c12, "ABC");
assert_eq!(row.c13, "涛思数据");
Ok(())
}
use std::time::Duration;
use chrono::{DateTime, Local};
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://";
let opts = PoolBuilder::new()
.max_size(5000) // max connections
.max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
.min_idle(Some(1000)) // minimal idle connections
.connection_timeout(Duration::from_secs(2));
let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
let taos = pool.get()?;
let db = "query";
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
loop {
let count: usize = taos
.query_one("select count(*) from `meters`")
.await?
.unwrap_or_default();
if count >= 6 {
break;
} else {
println!("waiting for data");
}
}
let mut result = taos.query("select tbname, * from `meters`").await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
// Query option 1, use rows stream.
let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
tbname: String,
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select tbname, * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
dbg!(result.summary());
assert_eq!(records.len(), 6);
dbg!(records);
Ok(())
}
use std::time::Duration;
use chrono::{DateTime, Local};
use taos::*;
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
}
async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// std::env::set_var("RUST_LOG", "debug");
pretty_env_logger::init();
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "tmq";
// prepare database
taos.exec_many([
"DROP TOPIC IF EXISTS tmq_meters".to_string(),
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))".to_string(),
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
])
.await?;
let task = tokio::spawn(prepare(taos));
tokio::time::sleep(Duration::from_secs(1)).await;
// subscribe
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?;
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
consumer.unsubscribe().await;
task.await??;
Ok(())
}
fn main() {
println!("Hello, world!");
}
......@@ -2555,10 +2555,14 @@ typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
int64_t ntbUid;
SArray* colIdList; // SArray<int16_t>
} SCheckAlterInfo;
} STqCheckInfo;
int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo);
int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo);
int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo);
int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
} STqDelCheckInfoReq;
typedef struct {
int32_t vgId;
......
......@@ -188,7 +188,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
......
......@@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
......
......@@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp
tDecoderClear(&decoder);
return 0;
}
int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) {
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
......@@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe
tEndDecode(decoder);
return 0;
}
int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......@@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
return 0;
}
#if 1
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
snprintf(buf, maxLen, "offset(reset to none)");
......@@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
}
return 0;
}
#endif
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
if (pLeft->type == pRight->type) {
......@@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
return 0;
}
int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) {
int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) {
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
int32_t sz = taosArrayGetSize(pInfo->colIdList);
......@@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo)
return pEncoder->pos;
}
int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) {
int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
int32_t sz;
......
......@@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -15,10 +15,10 @@
#define _DEFAULT_SOURCE
#include "mndOffset.h"
#include "mndPrivilege.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
......@@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
sdbRelease(pSdb, pOffset);
}
return code;
return code;
}
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
......
......@@ -356,31 +356,44 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
}
}
ASSERT(pIter == NULL);
// 7. handle unassigned vg
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
// if has consumer, assign all left vg
while (1) {
SMqConsumerEp *pConsumerEp = NULL;
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) break;
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
ASSERT(pIter);
if (pRemovedIter == NULL) {
if (pIter != NULL) {
taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
pIter = NULL;
}
break;
}
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
ASSERT(pIter);
pConsumerEp = (SMqConsumerEp *)pIter;
ASSERT(pConsumerEp->consumerId > 0);
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
break;
}
}
pRebVg = (SMqRebOutputVg *)pRemovedIter;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
ASSERT(pConsumerEp->consumerId > 0);
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 " (second scan)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
continue;
}
taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
}
} else {
......@@ -571,7 +584,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
/*ASSERT(pTopic);*/
if (pTopic == NULL) {
mError("rebalance %s failed since topic %s was dropped, abort", pRebInfo->key, topic);
mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic);
continue;
}
taosRLockLatch(&pTopic->lock);
......@@ -601,7 +614,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
// TODO replace assert with error check
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("persist rebalance output error, possibly vnode splitted or dropped");
mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
}
taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->newConsumers);
......
......@@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
......@@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (topicObj.ntbUid != 0) {
SCheckAlterInfo info;
STqCheckInfo info;
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
info.ntbUid = topicObj.ntbUid;
info.colIdList = topicObj.ntbColIds;
......@@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
// encoder check alter info
int32_t len;
int32_t code;
tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code);
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) {
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
......@@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) {
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
return -1;
......@@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + len;
action.msgType = TDMT_VND_CHECK_ALTER_INFO;
action.msgType = TDMT_VND_ADD_CHECK_INFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
......@@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
#if 0
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0);
mndTransDrop(pTrans);
mndReleaseTopic(pMnode, pTopic);
return -1;
}
#endif
// TODO check if rebalancing
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
......@@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return -1;
}
if (pTopic->ntbUid != 0) {
// broadcast to all vnode
void *pIter = NULL;
SVgObj *pVgroup = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
action.msgType = TDMT_VND_DELETE_CHECK_INFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
return -1;
}
}
}
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
mndReleaseTopic(pMnode, pTopic);
......
......@@ -24,6 +24,7 @@ target_sources(
"src/meta/metaCommit.c"
"src/meta/metaEntry.c"
"src/meta/metaSnapshot.c"
"src/meta/metaCache.c"
# sma
"src/sma/smaEnv.c"
......
......@@ -23,8 +23,9 @@
extern "C" {
#endif
typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB;
typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB;
typedef struct SMetaCache SMetaCache;
// metaDebug ==================
// clang-format off
......@@ -60,6 +61,13 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64()
// metaTable ==================
int metaHandleEntry(SMeta* pMeta, const SMetaEntry* pME);
// metaCache ==================
int32_t metaCacheOpen(SMeta* pMeta);
void metaCacheClose(SMeta* pMeta);
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo);
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
struct SMeta {
TdThreadRwlock lock;
......@@ -84,6 +92,8 @@ struct SMeta {
TTB* pStreamDb;
SMetaIdx* pIdx;
SMetaCache* pCache;
};
typedef struct {
......@@ -92,6 +102,12 @@ typedef struct {
} STbDbKey;
#pragma pack(push, 1)
typedef struct {
tb_uid_t suid;
int64_t version;
int32_t skmVer;
} SUidIdxVal;
typedef struct {
tb_uid_t uid;
int32_t sver;
......
......@@ -117,16 +117,15 @@ typedef struct {
struct STQ {
SVnode* pVnode;
char* path;
SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
SHashObj* pPushMgr; // consumerId -> STqHandle*
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore;
TDB* pMetaStore;
TDB* pMetaDB;
TTB* pExecStore;
TTB* pAlterInfoStore;
TTB* pCheckStore;
SStreamMeta* pStreamMeta;
};
......@@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
int32_t tqMetaRestoreHandle(STQ* pTq);
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
typedef struct {
int32_t size;
......
......@@ -130,6 +130,14 @@ int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
typedef struct SMetaInfo {
int64_t uid;
int64_t suid;
int64_t version;
int32_t skmVer;
} SMetaInfo;
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
// tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
int tsdbClose(STsdb** pTsdb);
......@@ -155,13 +163,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver);
// tq-mq
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#define META_CACHE_BASE_BUCKET 1024
// (uid , suid) : child table
// (uid, 0) : normal table
// (suid, suid) : super table
typedef struct SMetaCacheEntry SMetaCacheEntry;
struct SMetaCacheEntry {
SMetaCacheEntry* next;
SMetaInfo info;
};
struct SMetaCache {
int32_t nEntry;
int32_t nBucket;
SMetaCacheEntry** aBucket;
};
int32_t metaCacheOpen(SMeta* pMeta) {
int32_t code = 0;
SMetaCache* pCache = NULL;
pCache = (SMetaCache*)taosMemoryMalloc(sizeof(SMetaCache));
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pCache->nEntry = 0;
pCache->nBucket = META_CACHE_BASE_BUCKET;
pCache->aBucket = (SMetaCacheEntry**)taosMemoryCalloc(pCache->nBucket, sizeof(SMetaCacheEntry*));
if (pCache->aBucket == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pCache);
goto _err;
}
pMeta->pCache = pCache;
_exit:
return code;
_err:
metaError("vgId:%d meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code;
}
void metaCacheClose(SMeta* pMeta) {
if (pMeta->pCache) {
for (int32_t iBucket = 0; iBucket < pMeta->pCache->nBucket; iBucket++) {
SMetaCacheEntry* pEntry = pMeta->pCache->aBucket[iBucket];
while (pEntry) {
SMetaCacheEntry* tEntry = pEntry->next;
taosMemoryFree(pEntry);
pEntry = tEntry;
}
}
taosMemoryFree(pMeta->pCache->aBucket);
taosMemoryFree(pMeta->pCache);
pMeta->pCache = NULL;
}
}
static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
int32_t code = 0;
int32_t nBucket;
if (expand) {
nBucket = pCache->nBucket * 2;
} else {
nBucket = pCache->nBucket / 2;
}
SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
if (aBucket == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// rehash
for (int32_t iBucket = 0; iBucket < pCache->nBucket; iBucket++) {
SMetaCacheEntry* pEntry = pCache->aBucket[iBucket];
while (pEntry) {
SMetaCacheEntry* pTEntry = pEntry->next;
pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
pEntry = pTEntry;
}
}
// final set
taosMemoryFree(pCache->aBucket);
pCache->nBucket = nBucket;
pCache->aBucket = aBucket;
_exit:
return code;
}
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
int32_t code = 0;
// ASSERT(metaIsWLocked(pMeta));
// search
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(pInfo->uid) % pCache->nBucket;
SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
ppEntry = &(*ppEntry)->next;
}
if (*ppEntry) { // update
ASSERT(pInfo->suid == (*ppEntry)->info.suid);
if (pInfo->version > (*ppEntry)->info.version) {
(*ppEntry)->info.version = pInfo->version;
(*ppEntry)->info.skmVer = pInfo->skmVer;
}
} else { // insert
if (pCache->nEntry >= pCache->nBucket) {
code = metaRehashCache(pCache, 1);
if (code) goto _exit;
iBucket = TABS(pInfo->uid) % pCache->nBucket;
}
SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
if (pEntryNew == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pEntryNew->info = *pInfo;
pEntryNew->next = pCache->aBucket[iBucket];
pCache->aBucket[iBucket] = pEntryNew;
pCache->nEntry++;
}
_exit:
return code;
}
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
int32_t code = 0;
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->nBucket;
SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != uid) {
ppEntry = &(*ppEntry)->next;
}
SMetaCacheEntry* pEntry = *ppEntry;
if (pEntry) {
*ppEntry = pEntry->next;
taosMemoryFree(pEntry);
pCache->nEntry--;
if (pCache->nEntry < pCache->nBucket / 4 && pCache->nBucket > META_CACHE_BASE_BUCKET) {
code = metaRehashCache(pCache, 0);
if (code) goto _exit;
}
} else {
code = TSDB_CODE_NOT_FOUND;
}
_exit:
return code;
}
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
int32_t code = 0;
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->nBucket;
SMetaCacheEntry* pEntry = pCache->aBucket[iBucket];
while (pEntry && pEntry->info.uid != uid) {
pEntry = pEntry->next;
}
if (pEntry) {
*pInfo = pEntry->info;
} else {
code = TSDB_CODE_NOT_FOUND;
}
return code;
}
......@@ -73,7 +73,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
}
// open pUidIdx
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
if (ret < 0) {
metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
......@@ -143,6 +143,13 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto _err;
}
int32_t code = metaCacheOpen(pMeta);
if (code) {
terrno = code;
metaError("vgId:%d, failed to open meta cache since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
metaDebug("vgId:%d, meta is opened", TD_VID(pVnode));
*ppMeta = pMeta;
......@@ -169,6 +176,7 @@ _err:
int metaClose(SMeta *pMeta) {
if (pMeta) {
if (pMeta->pCache) metaCacheClose(pMeta);
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
......
......@@ -63,7 +63,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
return -1;
}
version = *(int64_t *)pReader->pBuf;
version = ((SUidIdxVal *)pReader->pBuf)[0].version;
return metaGetTableEntryByVersion(pReader, version, uid);
}
......@@ -160,7 +160,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
tDecoderClear(&pTbCur->mr.coder);
metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
continue;
}
......@@ -185,7 +185,7 @@ _query:
goto _err;
}
version = *(int64_t *)pData;
version = ((SUidIdxVal *)pData)[0].version;
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
SMetaEntry me = {0};
......@@ -888,3 +888,41 @@ END:
return ret;
}
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
int32_t code = 0;
void *pData = NULL;
int nData = 0;
metaRLock(pMeta);
// search cache
if (metaCacheGet(pMeta, uid, pInfo) == 0) {
metaULock(pMeta);
goto _exit;
}
// search TDB
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
// not found
metaULock(pMeta);
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
metaULock(pMeta);
pInfo->uid = uid;
pInfo->suid = ((SUidIdxVal *)pData)->suid;
pInfo->version = ((SUidIdxVal *)pData)->version;
pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;
// upsert the cache
metaWLock(pMeta);
metaCacheUpsert(pMeta, pInfo);
metaULock(pMeta);
_exit:
tdbFree(pData);
return code;
}
......@@ -28,9 +28,9 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
int vLen = 0;
const void *pKey = NULL;
const void *pVal = NULL;
void * pBuf = NULL;
void *pBuf = NULL;
int32_t szBuf = 0;
void * p = NULL;
void *p = NULL;
SMetaReader mr = {0};
// validate req
......@@ -83,8 +83,8 @@ int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey;
void * pKey = NULL;
void * pVal = NULL;
void *pKey = NULL;
void *pVal = NULL;
int kLen = 0;
int vLen = 0;
SEncoder coder = {0};
......@@ -130,7 +130,8 @@ _err:
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
SUidIdxVal uidIdxVal = {.suid = pME->smaEntry.tsma->indexUid, .version = pME->version, .skmVer = 0};
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
......
......@@ -27,6 +27,23 @@ static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
pInfo->uid = pEntry->uid;
pInfo->version = pEntry->version;
if (pEntry->type == TSDB_SUPER_TABLE) {
pInfo->suid = pEntry->uid;
pInfo->skmVer = pEntry->stbEntry.schemaRow.version;
} else if (pEntry->type == TSDB_CHILD_TABLE) {
pInfo->suid = pEntry->ctbEntry.suid;
pInfo->skmVer = 0;
} else if (pEntry->type == TSDB_NORMAL_TABLE) {
pInfo->suid = 0;
pInfo->skmVer = pEntry->ntbEntry.schemaRow.version;
} else {
ASSERT(0);
}
}
static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
if (NULL == pMetaRsp->pSchemas) {
......@@ -171,32 +188,22 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
void *pBuf = NULL;
int32_t szBuf = 0;
void *p = NULL;
SMetaReader mr = {0};
// validate req
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
if (mr.me.type == TSDB_SUPER_TABLE) {
metaReaderClear(&mr);
void *pData = NULL;
int nData = 0;
if (tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name), &pData, &nData) == 0) {
tb_uid_t uid = *(tb_uid_t *)pData;
tdbFree(pData);
SMetaInfo info;
metaGetInfo(pMeta, uid, &info);
if (info.uid == info.suid) {
return 0;
} else {
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
metaReaderClear(&mr);
return -1;
}
/*
// TODO: just for pass case
#if 0
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
metaReaderClear(&mr);
return -1;
#else
metaReaderClear(&mr);
return 0;
#endif
*/
}
metaReaderClear(&mr);
// set structs
me.version = version;
......@@ -275,8 +282,8 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
// drop super table
_drop_super_table:
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = *(int64_t *)pData, .uid = pReq->suid}, sizeof(STbDbKey),
&pMeta->txn);
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = ((SUidIdxVal *)pData)[0].version, .uid = pReq->suid},
sizeof(STbDbKey), &pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
......@@ -319,7 +326,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
return -1;
}
oversion = *(int64_t *)pData;
oversion = ((SUidIdxVal *)pData)[0].version;
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
......@@ -346,15 +353,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
metaSaveToSkmDb(pMeta, &nStbEntry);
}
// if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) {
// // change tag schema
// }
// update table.db
metaSaveToTbDb(pMeta, &nStbEntry);
// update uid index
tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0);
SMetaInfo info;
metaGetEntryInfo(&nStbEntry, &info);
tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t),
&(SUidIdxVal){.suid = info.suid, .version = info.version, .skmVer = info.skmVer}, sizeof(SUidIdxVal), 0);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
metaULock(pMeta);
......@@ -513,7 +519,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
SDecoder dc = {0};
rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
int64_t version = *(int64_t *)pData;
int64_t version = ((SUidIdxVal *)pData)[0].version;
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
......@@ -527,7 +533,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
int tLen = 0;
if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) {
version = *(int64_t *)tData;
version = ((SUidIdxVal *)tData)[0].version;
STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = version};
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) {
SDecoder tdc = {0};
......@@ -566,6 +572,8 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
--pMeta->pVnode->config.vndStats.numOfSTables;
}
metaCacheDrop(pMeta, uid);
tDecoderClear(&dc);
tdbFree(pData);
......@@ -604,7 +612,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
ASSERT(c == 0);
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
oversion = *(int64_t *)pData;
oversion = ((SUidIdxVal *)pData)[0].version;
// search table.db
TBC *pTbDbc = NULL;
......@@ -718,7 +726,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
// save to table db
metaSaveToTbDb(pMeta, &entry);
tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
metaUpdateUidIdx(pMeta, &entry);
metaSaveToSkmDb(pMeta, &entry);
......@@ -774,7 +782,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
ASSERT(c == 0);
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
oversion = *(int64_t *)pData;
oversion = ((SUidIdxVal *)pData)[0].version;
// search table.db
TBC *pTbDbc = NULL;
......@@ -794,8 +802,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
/* get stbEntry*/
tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
(void **)&stbEntry.pBuf, &nVal);
tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}),
sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal);
tdbFree(pVal);
tDecoderInit(&dc2, stbEntry.pBuf, nVal);
metaDecodeEntry(&dc2, &stbEntry);
......@@ -869,7 +877,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaSaveToTbDb(pMeta, &ctbEntry);
// save to uid.idx
tdbTbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
metaUpdateUidIdx(pMeta, &ctbEntry);
if (iCol == 0) {
metaUpdateTagIdx(pMeta, &ctbEntry);
......@@ -924,7 +932,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
ASSERT(c == 0);
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
oversion = *(int64_t *)pData;
oversion = ((SUidIdxVal *)pData)[0].version;
// search table.db
TBC *pTbDbc = NULL;
......@@ -969,7 +977,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
// save to table db
metaSaveToTbDb(pMeta, &entry);
tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
metaUpdateUidIdx(pMeta, &entry);
metaULock(pMeta);
tdbTbcClose(pTbDbc);
......@@ -1052,7 +1060,14 @@ _err:
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
// upsert cache
SMetaInfo info;
metaGetEntryInfo(pME, &info);
metaCacheUpsert(pMeta, &info);
SUidIdxVal uidIdxVal = {.suid = info.suid, .version = info.version, .skmVer = info.skmVer};
return tdbTbUpsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn);
}
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME) {
......@@ -1128,7 +1143,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
return -1;
}
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
tbDbKey.version = *(int64_t *)pData;
tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
tDecoderInit(&dc, pData, nData);
......
......@@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->path = strdup(path);
pTq->pVnode = pVnode;
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (tqMetaOpen(pTq) < 0) {
ASSERT(0);
......@@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
void tqClose(STQ* pTq) {
if (pTq) {
tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->handles);
taosHashCleanup(pTq->pushMgr);
taosHashCleanup(pTq->pAlterInfo);
taosHashCleanup(pTq->pHandle);
taosHashCleanup(pTq->pPushMgr);
taosHashCleanup(pTq->pCheckInfo);
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta);
......@@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0;
}
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) {
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
}
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
STqOffset offset = {0};
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
......@@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
} else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
TD_VID(pTq->pVnode), offset.val.version);
if (offset.val.version + 1 == version) {
offset.val.version += 1;
}
} else {
ASSERT(0);
}
/*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/
/*if (pOffset != NULL) {*/
/*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
return 0;
}
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
ASSERT(0);
return -1;
}
if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
ASSERT(0);
......@@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
}
}
// rsp
/*}*/
/*}*/
......@@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pAlterInfo, pIter);
pIter = taosHashIterate(pTq->pCheckInfo, pIter);
if (pIter == NULL) break;
SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter;
STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
if (pCheck->ntbUid == tbUid) {
int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) {
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
if (forbidColId == colId) {
taosHashCancelIterate(pTq->pAlterInfo, pIter);
taosHashCancelIterate(pTq->pCheckInfo, pIter);
return -1;
}
}
......@@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SWalCkHead* pCkHead = NULL;
// 1.find handle
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/
if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
......@@ -478,10 +490,10 @@ OVER:
return code;
}
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey));
int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0);
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
......@@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
return 0;
}
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) {
SCheckAlterInfo info = {0};
SDecoder decoder;
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
STqCheckInfo info = {0};
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) {
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tDecoderClear(&decoder);
if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) {
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req);
// todo lock
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
......@@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList);
}
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
// TODO
......@@ -668,34 +696,9 @@ FAIL:
return code;
}
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
//
return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen);
#if 0
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
goto FAIL;
}
tDecoderClear(&decoder);
if (tqExpandTask(pTq, pTask) < 0) {
goto FAIL;
}
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
return 0;
FAIL:
if (pTask) taosMemoryFree(pTask);
return -1;
#endif
return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
}
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
......@@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
}
}
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
......
......@@ -43,86 +43,116 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
return 0;
}
int32_t tqMetaRestoreHandle(STQ* pTq) {
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
int32_t tqMetaOpen(STQ* pTq) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) {
ASSERT(0);
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
SDecoder decoder;
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) {
ASSERT(0);
return -1;
}
tdbTbcMoveToFirst(pCur);
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) {
ASSERT(0);
return -1;
}
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
if (tqMetaRestoreHandle(pTq) < 0) {
return -1;
}
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
ASSERT(0);
return -1;
}
walRefVer(handle.pRef, handle.snapshotVer);
if (tqMetaRestoreCheckInfo(pTq) < 0) {
return -1;
}
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SReadHandle reader = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = handle.snapshotVer,
};
return 0;
}
handle.execHandle.execCol.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.execCol.task);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader);
} else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
int32_t tqMetaClose(STQ* pTq) {
if (pTq->pExecStore) {
tdbTbClose(pTq->pExecStore);
}
tdbTbcClose(pCur);
if (pTq->pCheckStore) {
tdbTbClose(pTq->pCheckStore);
}
tdbClose(pTq->pMetaDB);
return 0;
}
int32_t tqMetaOpen(STQ* pTq) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
ASSERT(0);
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
TXN txn;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) {
ASSERT(0);
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
return -1;
}
if (tqMetaRestoreHandle(pTq) < 0) {
if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) {
return -1;
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
return -1;
}
return 0;
}
int32_t tqMetaClose(STQ* pTq) {
if (pTq->pExecStore) {
tdbTbClose(pTq->pExecStore);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
TXN txn;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) {
/*ASSERT(0);*/
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
return 0;
}
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
ASSERT(0);
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqCheckInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tDecoderClear(&decoder);
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
tdbClose(pTq->pMetaStore);
tdbTbcClose(pCur);
return 0;
}
......@@ -153,7 +183,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
......@@ -161,7 +191,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
ASSERT(0);
}
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
......@@ -177,7 +207,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
......@@ -185,9 +215,67 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
/*ASSERT(0);*/
}
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
return 0;
}
int32_t tqMetaRestoreHandle(STQ* pTq) {
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
ASSERT(0);
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
ASSERT(0);
return -1;
}
walRefVer(handle.pRef, handle.snapshotVer);
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SReadHandle reader = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = handle.snapshotVer,
};
handle.execHandle.execCol.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.execCol.task);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader);
} else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
}
tdbTbcClose(pCur);
return 0;
}
......@@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->handles, pIter);
pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
......
......@@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
STQ* pTq = pWriter->pTq;
if (rollback) {
ASSERT(0);
tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn);
} else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
if (code) goto _err;
}
......
......@@ -108,29 +108,21 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
STbData *pTbData = NULL;
tb_uid_t suid = pMsgIter->suid;
tb_uid_t uid = pMsgIter->uid;
int32_t sverNew;
// check if table exists (todo: refact)
SMetaReader mr = {0};
// SMetaEntry me = {0};
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) {
metaReaderClear(&mr);
code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
SMetaInfo info;
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
if (code) {
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
goto _err;
}
if (pRsp->tblFName) strcat(pRsp->tblFName, mr.me.name);
if (mr.me.type == TSDB_NORMAL_TABLE) {
sverNew = mr.me.ntbEntry.schemaRow.version;
} else {
tDecoderClear(&mr.coder);
metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
sverNew = mr.me.stbEntry.schemaRow.version;
if (info.suid != suid) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
}
metaReaderClear(&mr);
pRsp->sver = sverNew;
if (info.suid) {
metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info);
}
pRsp->sver = info.skmVer;
// create/get STbData to op
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
......@@ -157,7 +149,17 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
SVBufPool *pPool = pTsdb->pVnode->inUse;
TSDBKEY lastKey = {.version = version, .ts = eKey};
// check if table exists (todo)
// check if table exists
SMetaInfo info;
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
if (code) {
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
goto _err;
}
if (info.suid != suid) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
}
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
if (code) {
......
......@@ -1467,6 +1467,10 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal;
SColVal *pColVal = &colVal;
memset(pColAgg, 0, sizeof(*pColAgg));
bool minAssigned = false;
bool maxAssigned = false;
*pColAgg = (SColumnDataAgg){.colId = pColData->cid};
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
tColDataGetValue(pColData, iVal, pColVal);
......@@ -1481,72 +1485,86 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
break;
case TSDB_DATA_TYPE_TINYINT: {
pColAgg->sum += colVal.value.i8;
if (pColAgg->min > colVal.value.i8) {
if (!minAssigned || pColAgg->min > colVal.value.i8) {
pColAgg->min = colVal.value.i8;
minAssigned = true;
}
if (pColAgg->max < colVal.value.i8) {
if (!maxAssigned || pColAgg->max < colVal.value.i8) {
pColAgg->max = colVal.value.i8;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
pColAgg->sum += colVal.value.i16;
if (pColAgg->min > colVal.value.i16) {
if (!minAssigned || pColAgg->min > colVal.value.i16) {
pColAgg->min = colVal.value.i16;
minAssigned = true;
}
if (pColAgg->max < colVal.value.i16) {
if (!maxAssigned || pColAgg->max < colVal.value.i16) {
pColAgg->max = colVal.value.i16;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_INT: {
pColAgg->sum += colVal.value.i32;
if (pColAgg->min > colVal.value.i32) {
if (!minAssigned || pColAgg->min > colVal.value.i32) {
pColAgg->min = colVal.value.i32;
minAssigned = true;
}
if (pColAgg->max < colVal.value.i32) {
if (!maxAssigned || pColAgg->max < colVal.value.i32) {
pColAgg->max = colVal.value.i32;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
pColAgg->sum += colVal.value.i64;
if (pColAgg->min > colVal.value.i64) {
if (!minAssigned || pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
minAssigned = true;
}
if (pColAgg->max < colVal.value.i64) {
if (!maxAssigned || pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(double*)(&pColAgg->sum) += colVal.value.f;
if (*(double*)(&pColAgg->min) > colVal.value.f) {
if (!minAssigned || *(double*)(&pColAgg->min) > colVal.value.f) {
*(double*)(&pColAgg->min) = colVal.value.f;
minAssigned = true;
}
if (*(double*)(&pColAgg->max) < colVal.value.f) {
if (!maxAssigned || *(double*)(&pColAgg->max) < colVal.value.f) {
*(double*)(&pColAgg->max) = colVal.value.f;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
*(double*)(&pColAgg->sum) += colVal.value.d;
if (*(double*)(&pColAgg->min) > colVal.value.d) {
if (!minAssigned || *(double*)(&pColAgg->min) > colVal.value.d) {
*(double*)(&pColAgg->min) = colVal.value.d;
minAssigned = true;
}
if (*(double*)(&pColAgg->max) < colVal.value.d) {
if (!maxAssigned || *(double*)(&pColAgg->max) < colVal.value.d) {
*(double*)(&pColAgg->max) = colVal.value.d;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_VARCHAR:
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
if (pColAgg->min > colVal.value.i64) {
if (!minAssigned || pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
minAssigned = true;
}
if (pColAgg->max < colVal.value.i64) {
if (!maxAssigned || pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
maxAssigned = true;
}
break;
}
......@@ -1554,41 +1572,49 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
break;
case TSDB_DATA_TYPE_UTINYINT: {
pColAgg->sum += colVal.value.u8;
if (pColAgg->min > colVal.value.u8) {
if (!minAssigned || pColAgg->min > colVal.value.u8) {
pColAgg->min = colVal.value.u8;
minAssigned = true;
}
if (pColAgg->max < colVal.value.u8) {
if (!maxAssigned || pColAgg->max < colVal.value.u8) {
pColAgg->max = colVal.value.u8;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
pColAgg->sum += colVal.value.u16;
if (pColAgg->min > colVal.value.u16) {
if (!minAssigned || pColAgg->min > colVal.value.u16) {
pColAgg->min = colVal.value.u16;
minAssigned = true;
}
if (pColAgg->max < colVal.value.u16) {
if (!maxAssigned || pColAgg->max < colVal.value.u16) {
pColAgg->max = colVal.value.u16;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_UINT: {
pColAgg->sum += colVal.value.u32;
if (pColAgg->min > colVal.value.u32) {
if (!minAssigned || pColAgg->min > colVal.value.u32) {
pColAgg->min = colVal.value.u32;
minAssigned = true;
}
if (pColAgg->max < colVal.value.u32) {
if (!minAssigned || pColAgg->max < colVal.value.u32) {
pColAgg->max = colVal.value.u32;
maxAssigned = true;
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
pColAgg->sum += colVal.value.u64;
if (pColAgg->min > colVal.value.u64) {
if (!minAssigned || pColAgg->min > colVal.value.u64) {
pColAgg->min = colVal.value.u64;
minAssigned = true;
}
if (pColAgg->max < colVal.value.u64) {
if (!maxAssigned || pColAgg->max < colVal.value.u64) {
pColAgg->max = colVal.value.u64;
maxAssigned = true;
}
break;
}
......
......@@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break;
/* TQ */
case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
if (tqProcessVgChangeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_VND_MQ_VG_DELETE:
if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
if (tqProcessVgDeleteReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
break;
case TDMT_VND_MQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead), version) < 0) {
if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_VND_CHECK_ALTER_INFO:
if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
case TDMT_VND_ADD_CHECK_INFO:
if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_VND_DELETE_CHECK_INFO:
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_STREAM_TASK_DEPLOY: {
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_DROP: {
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
......@@ -869,7 +875,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
submitBlkRsp.uid = createTbReq.uid;
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
msgIter.uid = createTbReq.uid;
if (createTbReq.type == TSDB_CHILD_TABLE) {
......
......@@ -3315,7 +3315,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
} else {
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
......
......@@ -1175,19 +1175,19 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
bool tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
bool isClosed = false;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
}
bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) &&
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup);
if ((update || closedWin) && out) {
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
......
......@@ -192,6 +192,24 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
return true;
}
static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) {
int32_t numOfSpaces = 0;
int32_t len = varDataLen(pVal->datum.p);
char* str = varDataVal(pVal->datum.p);
int32_t startPos = isLtrim ? 0 : len - 1;
int32_t step = isLtrim ? 1 : -1;
for (int32_t i = startPos; i < len || i >= 0; i += step) {
if (!isspace(str[i])) {
break;
}
numOfSpaces++;
}
return numOfSpaces;
}
void static addTimezoneParam(SNodeList* pList) {
char buf[6] = {0};
time_t t = taosTime(NULL);
......@@ -293,6 +311,40 @@ static int32_t translateInOutStr(SFunctionNode* pFunc, char* pErrBuf, int32_t le
return TSDB_CODE_SUCCESS;
}
static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isLtrim) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (!IS_VAR_DATA_TYPE(pPara1->resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
int32_t numOfSpaces = 0;
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 0);
// for select trim functions with constant value from table,
// need to set the proper result result schema bytes to avoid
// trailing garbage characters
if (nodeType(pParamNode1) == QUERY_NODE_VALUE) {
SValueNode* pValue = (SValueNode*)pParamNode1;
numOfSpaces = countTrailingSpaces(pValue, isLtrim);
}
int32_t resBytes = pPara1->resType.bytes - numOfSpaces;
pFunc->node.resType = (SDataType){.bytes = resBytes, .type = pPara1->resType.type};
return TSDB_CODE_SUCCESS;
}
static int32_t translateLtrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTrimStr(pFunc, pErrBuf, len, true);
}
static int32_t translateRtrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTrimStr(pFunc, pErrBuf, len, false);
}
static int32_t translateLogarithm(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (1 != numOfParams && 2 != numOfParams) {
......@@ -2827,7 +2879,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "ltrim",
.type = FUNCTION_TYPE_LTRIM,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.translateFunc = translateInOutStr,
.translateFunc = translateLtrim,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = ltrimFunction,
......@@ -2837,7 +2889,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "rtrim",
.type = FUNCTION_TYPE_RTRIM,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.translateFunc = translateInOutStr,
.translateFunc = translateRtrim,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = rtrimFunction,
......
......@@ -1210,7 +1210,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
*(int64_t*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
......@@ -1223,7 +1223,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
*(uint64_t*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
......@@ -1231,11 +1231,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double prev = 0;
GET_TYPED_DATA(prev, int64_t, type, &pBuf->v);
GET_TYPED_DATA(prev, double, type, &pBuf->v);
double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
*(double*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
......@@ -1243,11 +1243,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
}
} else if (type == TSDB_DATA_TYPE_FLOAT) {
double prev = 0;
GET_TYPED_DATA(prev, int64_t, type, &pBuf->v);
GET_TYPED_DATA(prev, double, type, &pBuf->v);
double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
*(double*)&pBuf->v = val;
}
if (pCtx->subsidiaries.num > 0) {
......
......@@ -758,7 +758,9 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
res->datum.p = taosMemoryCalloc(len, 1);
memcpy(res->datum.p, output.columnData->pData, len);
} else if (IS_VAR_DATA_TYPE(type)) {
res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
//res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1);
res->node.resType.bytes = varDataTLen(output.columnData->pData);
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
} else {
nodesSetValueNodeValue(res, output.columnData->pData);
......
......@@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosMemoryFree(pMeta);
}
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
......
......@@ -205,28 +205,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \
if (cliMsg->type == Release) return; \
} \
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \
if (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \
transQueueClear(&conn->cliMsgs); \
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
return; \
} \
} while (0)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
do { \
......@@ -358,7 +336,6 @@ void cliHandleResp(SCliConn* conn) {
if (cliRecvReleaseReq(conn, pHead)) {
return;
}
CONN_SHOULD_RELEASE(conn, pHead);
if (CONN_NO_PERSIST_BY_APP(conn)) {
pMsg = transQueuePop(&conn->cliMsgs);
......@@ -1418,7 +1395,7 @@ int transReleaseCliHandle(void* handle) {
}
STransMsg tmsg = {.info.handle = handle};
// TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg;
......
......@@ -69,7 +69,7 @@ class TDTestCase:
comput_irate_value = origin_result[1][0]*1000/( origin_result[1][-1] - origin_result[0][-1])
else:
comput_irate_value = (origin_result[1][0] - origin_result[0][0])*1000/( origin_result[1][-1] - origin_result[0][-1])
if abs(comput_irate_value - irate_value) <= 0.0000001:
if abs(comput_irate_value - irate_value) <= 0.001: # set as 0.001 avoid floating point precision calculation errors
tdLog.info(" irate work as expected , sql is %s "% irate_sql)
else:
tdLog.exit(" irate work not as expected , sql is %s "% irate_sql)
......
......@@ -25,13 +25,13 @@ from util.cases import *
from util.sql import *
from util.dnodes import *
dbname = 'db'
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def mavg_query_form(self, sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr="t1", condition=""):
def mavg_query_form(self, sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr=f"{dbname}.t1", condition=""):
'''
mavg function:
......@@ -50,7 +50,7 @@ class TDTestCase:
return f"{sel} {func} {col} {m_comm} {k} {r_comm} {alias} {fr} {table_expr} {condition}"
def checkmavg(self,sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr="t1", condition=""):
def checkmavg(self,sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr=f"{dbname}.t1", condition=""):
# print(self.mavg_query_form(sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr,
# table_expr=table_expr, condition=condition))
line = sys._getframe().f_back.f_lineno
......@@ -62,7 +62,7 @@ class TDTestCase:
table_expr=table_expr, condition=condition
))
sql = "select * from t1"
sql = f"select * from {dbname}.t1"
collist = tdSql.getColNameList(sql)
if not isinstance(col, str):
......@@ -326,9 +326,9 @@ class TDTestCase:
self.checkmavg(**case6)
# # case7~8: nested query
# case7 = {"table_expr": "(select c1 from stb1)"}
# case7 = {"table_expr": f"(select c1 from {dbname}.stb1)"}
# self.checkmavg(**case7)
# case8 = {"table_expr": "(select mavg(c1, 1) c1 from stb1 group by tbname)"}
# case8 = {"table_expr": f"(select mavg(c1, 1) c1 from {dbname}.stb1 group by tbname)"}
# self.checkmavg(**case8)
# case9~10: mix with tbname/ts/tag/col
......@@ -362,7 +362,7 @@ class TDTestCase:
self.checkmavg(**case17)
# # case18~19: with group by
# case19 = {
# "table_expr": "stb1",
# "table_expr": f"{dbname}.stb1",
# "condition": "partition by tbname"
# }
# self.checkmavg(**case19)
......@@ -371,14 +371,14 @@ class TDTestCase:
# case20 = {"condition": "order by ts"}
# self.checkmavg(**case20)
#case21 = {
# "table_expr": "stb1",
# "table_expr": f"{dbname}.stb1",
# "condition": "group by tbname order by tbname"
#}
#self.checkmavg(**case21)
# # case22: with union
# case22 = {
# "condition": "union all select mavg( c1 , 1 ) from t2"
# "condition": f"union all select mavg( c1 , 1 ) from {dbname}.t2"
# }
# self.checkmavg(**case22)
......@@ -486,32 +486,33 @@ class TDTestCase:
#tdSql.query(" select mavg( c1 , 1 ) + 2 from t1 ")
err41 = {"alias": "+ avg(c1)"}
self.checkmavg(**err41) # mix with arithmetic 2
err42 = {"alias": ", c1"}
self.checkmavg(**err42) # mix with other col
# err43 = {"table_expr": "stb1"}
# err42 = {"alias": ", c1"}
# self.checkmavg(**err42) # mix with other col
# err43 = {"table_expr": f"{dbname}.stb1"}
# self.checkmavg(**err43) # select stb directly
err44 = {
"col": "stb1.c1",
"table_expr": "stb1, stb2",
"condition": "where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts"
}
self.checkmavg(**err44) # stb join
# err44 = {
# "col": "stb1.c1",
# "table_expr": "stb1, stb2",
# "condition": "where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts"
# }
# self.checkmavg(**err44) # stb join
tdSql.query("select mavg( stb1.c1 , 1 ) from stb1, stb2 where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts;")
err45 = {
"condition": "where ts>0 and ts < now interval(1h) fill(next)"
}
self.checkmavg(**err45) # interval
err46 = {
"table_expr": "t1",
"table_expr": f"{dbname}.t1",
"condition": "group by c6"
}
self.checkmavg(**err46) # group by normal col
err47 = {
"table_expr": "stb1",
"table_expr": f"{dbname}.stb1",
"condition": "group by tbname slimit 1 "
}
# self.checkmavg(**err47) # with slimit
err48 = {
"table_expr": "stb1",
"table_expr": f"{dbname}.stb1",
"condition": "group by tbname slimit 1 soffset 1"
}
# self.checkmavg(**err48) # with soffset
......@@ -554,8 +555,8 @@ class TDTestCase:
err67 = {"k": 0.999999}
self.checkmavg(**err67) # k: left out of [1, 1000]
err68 = {
"table_expr": "stb1",
"condition": "group by tbname order by tbname" # order by tbname not supported
"table_expr": f"{dbname}.stb1",
"condition": f"group by tbname order by tbname" # order by tbname not supported
}
self.checkmavg(**err68)
......@@ -565,42 +566,42 @@ class TDTestCase:
for i in range(tbnum):
for j in range(data_row):
tdSql.execute(
f"insert into t{i} values ("
f"insert into {dbname}.t{i} values ("
f"{basetime + (j+1)*10}, {random.randint(-200, -1)}, {random.uniform(200, -1)}, {basetime + random.randint(-200, -1)}, "
f"'binary_{j}', {random.uniform(-200, -1)}, {random.choice([0,1])}, {random.randint(-200,-1)}, "
f"{random.randint(-200, -1)}, {random.randint(-127, -1)}, 'nchar_{j}' )"
)
tdSql.execute(
f"insert into t{i} values ("
f"insert into {dbname}.t{i} values ("
f"{basetime - (j+1) * 10}, {random.randint(1, 200)}, {random.uniform(1, 200)}, {basetime - random.randint(1, 200)}, "
f"'binary_{j}_1', {random.uniform(1, 200)}, {random.choice([0, 1])}, {random.randint(1,200)}, "
f"{random.randint(1,200)}, {random.randint(1,127)}, 'nchar_{j}_1' )"
)
tdSql.execute(
f"insert into tt{i} values ( {basetime-(j+1) * 10}, {random.randint(1, 200)} )"
f"insert into {dbname}.tt{i} values ( {basetime-(j+1) * 10}, {random.randint(1, 200)} )"
)
pass
def mavg_test_table(self,tbnum: int) -> None :
tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db keep 3650")
tdSql.execute("use db")
tdSql.execute(f"drop database if exists {dbname}")
tdSql.execute(f"create database if not exists {dbname} keep 3650")
tdSql.execute(f"use {dbname}")
tdSql.execute(
"create stable db.stb1 (\
f"create stable {dbname}.stb1 (\
ts timestamp, c1 int, c2 float, c3 timestamp, c4 binary(16), c5 double, c6 bool, \
c7 bigint, c8 smallint, c9 tinyint, c10 nchar(16)\
) \
tags(st1 int)"
)
tdSql.execute(
"create stable db.stb2 (ts timestamp, c1 int) tags(st2 int)"
f"create stable {dbname}.stb2 (ts timestamp, c1 int) tags(st2 int)"
)
for i in range(tbnum):
tdSql.execute(f"create table t{i} using stb1 tags({i})")
tdSql.execute(f"create table tt{i} using stb2 tags({i})")
tdSql.execute(f"create table {dbname}.t{i} using {dbname}.stb1 tags({i})")
tdSql.execute(f"create table {dbname}.tt{i} using {dbname}.stb2 tags({i})")
pass
......@@ -617,25 +618,25 @@ class TDTestCase:
tdLog.printNoPrefix("######## insert only NULL test:")
for i in range(tbnum):
tdSql.execute(f"insert into t{i}(ts) values ({nowtime - 5})")
tdSql.execute(f"insert into t{i}(ts) values ({nowtime + 5})")
tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime - 5})")
tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime + 5})")
self.mavg_current_query()
self.mavg_error_query()
tdLog.printNoPrefix("######## insert data in the range near the max(bigint/double):")
# self.mavg_test_table(tbnum)
# tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
# tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values "
# f"({nowtime - (per_table_rows + 1) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})")
# tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
# tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values "
# f"({nowtime - (per_table_rows + 2) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})")
# self.mavg_current_query()
# self.mavg_error_query()
tdLog.printNoPrefix("######## insert data in the range near the min(bigint/double):")
# self.mavg_test_table(tbnum)
# tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
# tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values "
# f"({nowtime - (per_table_rows + 1) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {1-2**63})")
# tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
# tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values "
# f"({nowtime - (per_table_rows + 2) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {512-2**63})")
# self.mavg_current_query()
# self.mavg_error_query()
......@@ -649,9 +650,9 @@ class TDTestCase:
tdLog.printNoPrefix("######## insert data mix with NULL test:")
for i in range(tbnum):
tdSql.execute(f"insert into t{i}(ts) values ({nowtime})")
tdSql.execute(f"insert into t{i}(ts) values ({nowtime-(per_table_rows+3)*10})")
tdSql.execute(f"insert into t{i}(ts) values ({nowtime+(per_table_rows+3)*10})")
tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime})")
tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime-(per_table_rows+3)*10})")
tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime+(per_table_rows+3)*10})")
self.mavg_current_query()
self.mavg_error_query()
......@@ -664,67 +665,64 @@ class TDTestCase:
tdDnodes.start(index)
self.mavg_current_query()
self.mavg_error_query()
tdSql.query("select mavg(1,1) from t1")
tdSql.query(f"select mavg(1,1) from {dbname}.t1")
tdSql.checkRows(7)
tdSql.checkData(0,0,1.000000000)
tdSql.checkData(1,0,1.000000000)
tdSql.checkData(5,0,1.000000000)
tdSql.query("select mavg(abs(c1),1) from t1")
tdSql.query(f"select mavg(abs(c1),1) from {dbname}.t1")
tdSql.checkRows(4)
def mavg_support_stable(self):
tdSql.query(" select mavg(1,3) from stb1 ")
tdSql.query(f" select mavg(1,3) from {dbname}.stb1 ")
tdSql.checkRows(68)
tdSql.checkData(0,0,1.000000000)
tdSql.query("select mavg(c1,3) from stb1 partition by tbname ")
tdSql.query(f"select mavg(c1,3) from {dbname}.stb1 partition by tbname ")
tdSql.checkRows(20)
# tdSql.query("select mavg(st1,3) from stb1 partition by tbname")
# tdSql.checkRows(38)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.query(f"select mavg(st1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(50)
tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(20)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(20)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(20)
# # bug need fix
# tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(2)
# tdSql.error("select mavg(st1+c1,3) from stb1 partition by tbname limit 1 ")
# bug need fix
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(20)
# bug need fix
# tdSql.query("select tbname , mavg(c1,3) from stb1 partition by tbname")
# tdSql.checkRows(38)
# tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname")
# tdSql.checkRows(38)
# tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname slimit 1")
# tdSql.checkRows(2)
tdSql.query(f"select tbname , mavg(c1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(20)
tdSql.query(f"select tbname , mavg(st1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(50)
tdSql.query(f"select tbname , mavg(st1,3) from {dbname}.stb1 partition by tbname slimit 1")
tdSql.checkRows(5)
# partition by tags
# tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1")
# tdSql.checkRows(38)
# tdSql.query("select mavg(c1,3) from stb1 partition by st1")
# tdSql.checkRows(38)
# tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
# tdSql.query("select mavg(c1,3) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
tdSql.query(f"select st1 , mavg(c1,3) from {dbname}.stb1 partition by st1")
tdSql.checkRows(20)
tdSql.query(f"select mavg(c1,3) from {dbname}.stb1 partition by st1")
tdSql.checkRows(20)
tdSql.query(f"select st1 , mavg(c1,3) from {dbname}.stb1 partition by st1 slimit 1")
tdSql.checkRows(2)
tdSql.query(f"select mavg(c1,3) from {dbname}.stb1 partition by st1 slimit 1")
tdSql.checkRows(2)
# partition by col
# tdSql.query("select c1 , mavg(c1,3) from stb1 partition by c1")
# tdSql.checkRows(38)
# tdSql.query("select mavg(c1 ,3) from stb1 partition by c1")
# tdSql.checkRows(38)
# tdSql.query("select c1 , mavg(c1,3) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
tdSql.query(f"select c1 , mavg(c1,3) from {dbname}.stb1 partition by c1")
tdSql.checkRows(0)
tdSql.query(f"select c1 , mavg(c1,1) from {dbname}.stb1 partition by c1")
tdSql.checkRows(40)
tdSql.query(f"select c1, c2, c3, c4, mavg(c1,3) from {dbname}.stb1 partition by tbname ")
tdSql.checkRows(20)
tdSql.query(f"select c1, c2, c3, c4, mavg(123,3) from {dbname}.stb1 partition by tbname ")
tdSql.checkRows(50)
def run(self):
import traceback
......
......@@ -873,7 +873,7 @@ class TDTestCase:
# bug need fix
tdSql.query("select c1 ,t1, sample(c1,2) from db.stb1 partition by c1 ")
tdSql.query("select sample(c1,2) from db.stb1 partition by c1 ")
# tdSql.query("select c1 ,ind, sample(c1,2) from sample_db.st partition by c1 ")
tdSql.query("select c1 ,ind, sample(c1,2) from sample_db.st partition by c1 ")
def run(self):
import traceback
......
......@@ -113,7 +113,7 @@ python3 ./test.py -f 2-query/hyperloglog.py -R
python3 ./test.py -f 2-query/interp.py
python3 ./test.py -f 2-query/interp.py -R
python3 ./test.py -f 2-query/irate.py
# python3 ./test.py -f 2-query/irate.py -R
python3 ./test.py -f 2-query/irate.py -R
python3 ./test.py -f 2-query/join.py
python3 ./test.py -f 2-query/join.py -R
python3 ./test.py -f 2-query/last_row.py
......@@ -169,7 +169,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 2-query/elapsed.py
python3 ./test.py -f 2-query/csum.py
#python3 ./test.py -f 2-query/mavg.py
python3 ./test.py -f 2-query/mavg.py
python3 ./test.py -f 2-query/sample.py
python3 ./test.py -f 2-query/function_diff.py
python3 ./test.py -f 2-query/unique.py
......@@ -358,7 +358,7 @@ python3 ./test.py -f 2-query/interp.py -Q 2
python3 ./test.py -f 2-query/avg.py -Q 2
# python3 ./test.py -f 2-query/elapsed.py -Q 2
python3 ./test.py -f 2-query/csum.py -Q 2
#python3 ./test.py -f 2-query/mavg.py -Q 2
python3 ./test.py -f 2-query/mavg.py -Q 2
python3 ./test.py -f 2-query/sample.py -Q 2
python3 ./test.py -f 2-query/function_diff.py -Q 2
python3 ./test.py -f 2-query/unique.py -Q 2
......@@ -445,7 +445,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 3
# python3 ./test.py -f 2-query/avg.py -Q 3
# python3 ./test.py -f 2-query/elapsed.py -Q 3
python3 ./test.py -f 2-query/csum.py -Q 3
#python3 ./test.py -f 2-query/mavg.py -Q 3
python3 ./test.py -f 2-query/mavg.py -Q 3
python3 ./test.py -f 2-query/sample.py -Q 3
python3 ./test.py -f 2-query/function_diff.py -Q 3
python3 ./test.py -f 2-query/unique.py -Q 3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册