diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 05191138e549acc29cf8f60ab2772df105be8109..5f9a44084c414c3f9dc2427b186d90f628de37b5 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 22627d7 + GIT_TAG 7c641c5 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index d2aca5c361f15cbc9815dc65de08cd668e6a2b76..2eb4bba309be80245e8fc6b2ed6985a6cb7dac10 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -877,8 +877,8 @@ INTERP(expr) - The number of rows in the result set of `INTERP` is determined by the parameter `EVERY(time_unit)`. Starting from timestamp1, one interpolation is performed for every time interval specified `time_unit` parameter. The parameter `time_unit` must be an integer, with no quotes, with a time unit of: a(millisecond)), s(second), m(minute), h(hour), d(day), or w(week). For example, `EVERY(500a)` will interpolate every 500 milliseconds. - Interpolation is performed based on `FILL` parameter. For more information about FILL clause, see [FILL Clause](../distinguished/#fill-clause). - `INTERP` can only be used to interpolate in single timeline. So it must be used with `partition by tbname` when it's used on a STable. -- Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.1.4). -- Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.2.3). +- Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.2.0). +- Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.3.0). ### LAST diff --git a/docs/en/14-reference/03-connector/08-node.mdx b/docs/en/14-reference/03-connector/08-node.mdx index 83479f91dab38d072f53952464153190ad3eaa03..b7c2e6b76537dc8b2a039684d2d712d91337cecd 100644 --- a/docs/en/14-reference/03-connector/08-node.mdx +++ b/docs/en/14-reference/03-connector/08-node.mdx @@ -32,7 +32,9 @@ Please refer to [version support list](/reference/connector#version-support) ## Supported features -### Native connectors + + + 1. Connection Management 2. General Query @@ -41,12 +43,16 @@ Please refer to [version support list](/reference/connector#version-support) 5. Subscription 6. Schemaless -### REST Connector + + 1. Connection Management 2. General Query 3. Continuous Query + + + ## Installation Steps ### Pre-installation preparation @@ -115,6 +121,9 @@ npm install @tdengine/rest ### Verify + + + After installing the TDengine client, use the `nodejsChecker.js` program to verify that the current environment supports Node.js access to TDengine. Verification in details: @@ -131,6 +140,28 @@ node nodejsChecker.js host=localhost - After executing the above steps, the command-line will output the result of `nodejsChecker.js` connecting to the TDengine instance and performing a simple insert and query. + + + +After installing the TDengine client, use the `restChecker.js` program to verify that the current environment supports Node.js access to TDengine. + +Verification in details: + +- Create an installation test folder such as `~/tdengine-test`. Download the [restChecker.js source code](https://github.com/taosdata/TDengine/tree/3.0/docs/examples/node/restexample/restChecker.js) to your local. + +- Execute the following command from the command-line. + +```bash +npm init -y +npm install @tdengine/rest +node restChecker.js +``` + +- After executing the above steps, the command-line will output the result of `restChecker.js` connecting to the TDengine instance and performing a simple insert and query. + + + + ## Establishing a connection Please choose to use one of the connectors. @@ -182,24 +213,69 @@ let cursor = conn.cursor(); #### SQL Write + + + + + + +```js +{{#include docs/examples/node/restexample/insert_example.js}} +``` + + + + #### InfluxDB line protocol write + + + + + + #### OpenTSDB Telnet line protocol write + + + + + + #### OpenTSDB JSON line protocol write + + + + + + ### Querying data + + + + + + + +```js +{{#include docs/examples/node/restexample/query_example.js}} +``` + + + + ## More sample programs diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md index c88772cc9e4bc40d293009519496b5900aa66797..7075dc771eccc83a7cabb6807f34667de8f5fcc7 100644 --- a/docs/en/14-reference/12-config/index.md +++ b/docs/en/14-reference/12-config/index.md @@ -179,9 +179,10 @@ The parameters described in this document by the effect that they have on the sy | Attribute | Description | | -------- | -------------------------------- | | Applicable | Server only | -| Meaning | count()/hyperloglog() return value or not if the result data is NULL | +| Meaning | count()/hyperloglog() return value or not if the input data is empty or NULL | | Vlue Range | 0:Return empty line,1:Return 0 | | Default | 1 | +| Notes | When this parameter is setting to 1, for queries containing GROUP BY, PARTITION BY and INTERVAL clause, and input data in certain groups or windows is empty or NULL, the corresponding groups or windows have no return values | ### maxNumOfDistinctRes diff --git a/docs/examples/node/restexample/insert_example.js b/docs/examples/node/restexample/insert_example.js new file mode 100644 index 0000000000000000000000000000000000000000..d9fddf9f2888a878f369e7a2ee20c5ce49d0cc99 --- /dev/null +++ b/docs/examples/node/restexample/insert_example.js @@ -0,0 +1,19 @@ +const { options, connect } = require("@tdengine/rest"); + +async function sqlInsert() { + options.path = "/rest/sql"; + options.host = "localhost"; + options.port = 6041; + let conn = connect(options); + let cursor = conn.cursor(); + try { + let res = await cursor.query('CREATE DATABASE power'); + res = await cursor.query('CREATE STABLE power.meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int)'); + res = await cursor.query('INSERT INTO power.d1001 USING power.meters TAGS ("California.SanFrancisco", 2) VALUES (NOW, 10.2, 219, 0.32)'); + console.log("res.getResult()", res.getResult()); + } catch (err) { + console.log(err); + } +} +sqlInsert(); + diff --git a/docs/examples/node/restexample/query_example.js b/docs/examples/node/restexample/query_example.js new file mode 100644 index 0000000000000000000000000000000000000000..0edce64a24edddaf48b207d676f6d704d832ed68 --- /dev/null +++ b/docs/examples/node/restexample/query_example.js @@ -0,0 +1,16 @@ +const { options, connect } = require("@tdengine/rest"); + +async function query() { + options.path = "/rest/sql"; + options.host = "localhost"; + options.port = 6041; + let conn = connect(options); + let cursor = conn.cursor(); + try { + let res = await cursor.query('select * from power.meters'); + console.log("res.getResult()", res.getResult()); + } catch (err) { + console.log(err); + } +} +query(); diff --git a/docs/examples/node/restexample/restChecker.js b/docs/examples/node/restexample/restChecker.js new file mode 100644 index 0000000000000000000000000000000000000000..a999684e575d4271e5a62895a5b1678a37d16a65 --- /dev/null +++ b/docs/examples/node/restexample/restChecker.js @@ -0,0 +1,78 @@ +const { options, connect } = require("@tdengine/rest"); +options.path = '/rest/sql/' +options.host = 'localhost'; +options.port = 6041; +options.user = "root"; +options.passwd = "taosdata"; + +//optional +// options.url = "http://127.0.0.1:6041"; + +const db = 'rest_ts_db'; +const table = 'rest' +const createDB = `create database if not exists ${db} keep 3650`; +const dropDB = `drop database if exists ${db}`; +const createTB = `create table if not exists ${db}.${table}(ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint,bnr binary(40),nchr nchar(40))`; +const addColumn = `alter table ${db}.${table} add column new_column nchar(40) `; +const dropColumn = `alter table ${db}.${table} drop column new_column`; +const insertSql = `insert into ${db}.${table} values('2022-03-30 18:30:51.567',1,2,3,4,'binary1','nchar1')` + + `('2022-03-30 18:30:51.568',5,6,7,8,'binary2','nchar2')` + + `('2022-03-30 18:30:51.569',9,0,1,2,'binary3','nchar3')`; +const querySql = `select * from ${db}.${table}`; +const errorSql = 'show database'; + +let conn = connect(options); +let cursor = conn.cursor(); + +async function execute(sql, pure = true) { + let result = await cursor.query(sql, pure); + // print query result as taos shell + // Get Result object, return Result object. + console.log("result.getResult()",result.getResult()); + // Get Meta data, return Meta[]|undefined(when execute failed this is undefined). + console.log("result.getMeta()",result.getMeta()); + // Get data,return Array>|undefined(when execute failed this is undefined). + console.log("result.getData()",result.getData()); + // Get affect rows,return number|undefined(when execute failed this is undefined). + console.log("result.getAffectRows()",result.getAffectRows()); + // Get command,return SQL send to server(need to `query(sql,false)`,set 'pure=false',default true). + console.log("result.getCommand()",result.getCommand()); + // Get error code ,return number|undefined(when execute failed this is undefined). + console.log("result.getErrCode()",result.getErrCode()); + // Get error string,return string|undefined(when execute failed this is undefined). + console.log("result.getErrStr()",result.getErrStr()); +} + +(async () => { + // start execute time + let start = new Date().getTime(); + await execute(createDB); + console.log("-----------------------------------") + + await execute(createTB); + console.log("-----------------------------------") + + await execute(addColumn); + console.log("----------------------------------") + + await execute(dropColumn); + console.log("-----------------------------------") + + await execute(insertSql); + console.log("-----------------------------------") + + await execute(querySql); + console.log("-----------------------------------") + + await execute(errorSql); + console.log("-----------------------------------") + + await execute(dropDB); + // finish time + let end = new Date().getTime(); + console.log("total spend time:%d ms",end - start); +})() + + + + diff --git a/docs/zh/08-connector/35-node.mdx b/docs/zh/08-connector/35-node.mdx index f2aff41da2da7967410256550d3cfca30c31423b..25f8bdf1775357f4c81d11e37bfeb4f6f13a3d09 100644 --- a/docs/zh/08-connector/35-node.mdx +++ b/docs/zh/08-connector/35-node.mdx @@ -31,7 +31,8 @@ REST 连接器支持所有能运行 Node.js 的平台。 ## 支持的功能特性 -### 原生连接器 + + 1. 连接管理 2. 普通查询 @@ -40,12 +41,17 @@ REST 连接器支持所有能运行 Node.js 的平台。 5. 订阅功能 6. Schemaless -### REST 连接器 + + 1. 连接管理 2. 普通查询 3. 连续查询 + + + + ## 安装步骤 ### 安装前准备 @@ -114,6 +120,9 @@ npm install @tdengine/rest ### 安装验证 + + + 在安装好 TDengine 客户端后,使用 nodejsChecker.js 程序能够验证当前环境是否支持 Node.js 方式访问 TDengine。 验证方法: @@ -130,11 +139,35 @@ node nodejsChecker.js host=localhost - 执行以上步骤后,在命令行会输出 nodejsChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。 + + + +在安装好 TDengine 客户端后,使用 nodejsChecker.js 程序能够验证当前环境是否支持 Node.js 方式访问 TDengine。 + +验证方法: + +- 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [restChecker.js 源代码](https://github.com/taosdata/TDengine/tree/3.0/docs/examples/node/restexample/restChecker.js)到本地。 + +- 在命令行中执行以下命令。 + +```bash +npm init -y +npm install @tdengine/rest +node restChecker.js +``` + +- 执行以上步骤后,在命令行会输出 restChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。 + + + + + + ## 建立连接 请选择使用一种连接器。 - + 安装并引用 `@tdengine/client` 包。 @@ -181,24 +214,71 @@ let cursor = conn.cursor(); #### SQL 写入 + + + + + + +```js +{{#include docs/examples/node/restexample/insert_example.js}} +``` + + + + + + #### InfluxDB 行协议写入 + + + + + + #### OpenTSDB Telnet 行协议写入 + + + + + + #### OpenTSDB JSON 行协议写入 + + + + + + ### 查询数据 + + + + + + + +```js +{{#include docs/examples/node/restexample/query_example.js}} +``` + + + + ## 更多示例程序 diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index b21d9c25dd38c602a44234897085dfdadd977065..647f91742294a8fdd7a174c9d68e581fe691c94e 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -879,8 +879,8 @@ INTERP(expr) - INTERP 根据 EVERY(time_unit) 字段来确定输出时间范围内的结果条数,即从 timestamp1 开始每隔固定长度的时间(time_unit 值)进行插值,time_unit 可取值时间单位:1a(毫秒),1s(秒),1m(分),1h(小时),1d(天),1w(周)。例如 EVERY(500a) 将对于指定数据每500毫秒间隔进行一次插值. - INTERP 根据 FILL 字段来决定在每个符合输出条件的时刻如何进行插值。关于 FILL 子句如何使用请参考 [FILL 子句](../distinguished/#fill-子句) - INTERP 只能在一个时间序列内进行插值,因此当作用于超级表时必须跟 partition by tbname 一起使用。 -- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.1.4版本以后支持)。 -- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.2.3版本以后支持)。 +- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0版本以后支持)。 +- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0版本以后支持)。 ### LAST diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index c903ad7df699a6b3284e112d1befeb80de3a03cd..eed8b9ca7dc0d0e6d8801b64071377062b523358 100644 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -192,14 +192,15 @@ taos --dump-config | 取值范围 | 0 表示包含函数名,1 表示不包含函数名。 | | 缺省值 | 0 | -### countAlwaysReturnValue +### countAlwaysReturnValue | 属性 | 说明 | | -------- | -------------------------------- | | 适用范围 | 仅服务端适用 | -| 含义 | count/hyperloglog函数在数据为空或者NULL的情况下是否返回值 | +| 含义 | count/hyperloglog函数在输入数据为空或者NULL的情况下是否返回值 | | 取值范围 | 0:返回空行,1:返回 0 | | 缺省值 | 1 | +| 补充说明 | 该参数设置为 1 时,如果查询中含有 GROUP BY,PARTITION BY 以及 INTERVAL 子句且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果 | ## 区域相关 diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index d7a62f5402defb95da6a4217254e14dbbc6de56c..a36a7513f3dcd683ad4c0d3d79a7259617c8754f 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -205,7 +205,7 @@ struct SColData { int32_t numOfNull; // # of null int32_t numOfValue; // # of vale int32_t nVal; - uint8_t flag; + int8_t flag; uint8_t *pBitMap; int32_t *aOffset; int32_t nData; diff --git a/packaging/tools/mac_before_install.txt b/packaging/tools/mac_before_install.txt index 3b6d610e882e9d2b6d9d8986f23fd1a5b8f9cb0a..a58bab7b6560fc01dac28c073fc66e73ec391b11 100644 --- a/packaging/tools/mac_before_install.txt +++ b/packaging/tools/mac_before_install.txt @@ -1,4 +1,4 @@ -TDengine is a high-efficient, scalable, high-available distributed time-series database, which makes a lot of optimizations on inserting and querying data, which is far more efficient than normal regular databases. So TDengine can meet the high requirements of IOT and other areas on storing and querying a large amount of data. +TDengine is an open-source, cloud-native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. With its built-in caching, stream processing, and data subscription capabilities, TDengine offers a simplified solution for time-series data processing. To configure TDengine : edit /etc/taos/taos.cfg To start service : launchctl start com.tdengine.taosd diff --git a/packaging/tools/mac_before_install_client.txt b/packaging/tools/mac_before_install_client.txt new file mode 100644 index 0000000000000000000000000000000000000000..d66fa673589bed6d560a9228269c5de2f4005e9f --- /dev/null +++ b/packaging/tools/mac_before_install_client.txt @@ -0,0 +1,7 @@ +TDengine is an open-source, cloud-native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. With its built-in caching, stream processing, and data subscription capabilities, TDengine offers a simplified solution for time-series data processing. + +Once it's installed, please take the steps below: +1: open a terminal/shell in Mac +2: if connecting to Cloud Service, follow the instructions on your cloud service account and configure the environment variable +3: if connecting to another TDengine Service, you can also view help information via "taos --help" +4: execute command taos \ No newline at end of file diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 70cdd291e44a355aeded4ab038e673d34b3277d8..a12f94cf1d88f3f52c958e20dfa130f98813da70 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -25,7 +25,9 @@ #include "tref.h" #include "ttimer.h" -static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } +static tb_uid_t processSuid(tb_uid_t suid, char* db){ + return suid + MurmurHash3_32(db, strlen(db)); +} static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t) { @@ -1642,6 +1644,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) } if (strcmp(tbName, pCreateReq.name) == 0) { cloneSVreateTbReq(&pCreateReq, &pCreateReqDst); + pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb); tDecoderClear(&decoderTmp); break; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 99f0ed77039f8d19c96b2f4100f5aeee9e3cf557..04af05cc4472666d1be8411deaadfc0e185b15f6 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1358,7 +1358,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } taosArrayDestroy(qa); } else { - // TODO handle delete table from stb + tqReaderRemoveTbUidList(pExec->execHandle.pExecReader, tbUidList); } } } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3801a25d6db74e53edb991c104318dd9776b4b07..29a25e4cd09b5abb279fceaf4108b5bf53f8f058 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -498,7 +498,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* taosArrayPush(tagArray, &tagVal); } } - pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + pCreateTbReq->ctb.tagNum = size; STag* pTag = NULL; tTagNew(tagArray, 1, false, &pTag); diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 02d076113f6cdab7066740ce071426ad06efb7d4..301b504346d80190653ea420c311e23ef7e7464b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -22,7 +22,7 @@ * us: 3600*1000000*8765*1000 // 1970 + 1000 years * ns: 3600*1000000000*8765*292 // 1970 + 292 years */ -static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L}; +int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L}; // static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); @@ -60,23 +60,6 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 return 0; } -#if 0 -static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey, - TSKEY now) { - TSKEY rowKey = TD_ROW_KEY(row); - if (rowKey < minKey || rowKey > maxKey) { - tsdbError("vgId:%d, table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 - " maxKey %" PRId64 " row key %" PRId64, - REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey, - rowKey); - terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; - return -1; - } - - return 0; -} -#endif - static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey, TSKEY now) { if (rowKey < minKey || rowKey > maxKey) { @@ -89,79 +72,6 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowK return 0; } -#if 0 -int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { - ASSERT(pMsg != NULL); - // STsdbMeta * pMeta = pTsdb->tsdbMeta; - SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock = NULL; - SSubmitBlkIter blkIter = {0}; - STSRow *row = NULL; - STsdbKeepCfg *pCfg = &pTsdb->keepCfg; - TSKEY now = taosGetTimestamp(pCfg->precision); - TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; - TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision]; - - terrno = TSDB_CODE_SUCCESS; - // pMsg->length = htonl(pMsg->length); - // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - - if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; - while (true) { - if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; - if (pBlock == NULL) break; - - // pBlock->uid = htobe64(pBlock->uid); - // pBlock->suid = htobe64(pBlock->suid); - // pBlock->sversion = htonl(pBlock->sversion); - // pBlock->dataLen = htonl(pBlock->dataLen); - // pBlock->schemaLen = htonl(pBlock->schemaLen); - // pBlock->numOfRows = htonl(pBlock->numOfRows); - -#if 0 - if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { - tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - STable *pTable = pMeta->tables[pBlock->tid]; - if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) { - tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { - tsdbError("vgId:%d, invalid action trying to insert a super table %s", REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable)); - terrno = TSDB_CODE_TDB_INVALID_ACTION; - return -1; - } - - // Check schema version and update schema if needed - if (tsdbCheckTableSchema(pTsdb, pBlock, pTable) < 0) { - if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - continue; - } else { - return -1; - } - } -#endif - tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); - while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { - if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) { - return -1; - } - } - } - - if (terrno != TSDB_CODE_SUCCESS) return -1; - return 0; -} -#endif - int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) { int32_t code = 0; STsdbKeepCfg *pCfg = &pTsdb->keepCfg; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index cb85ff8dd370c906d52ab9751d5ec8dc6a94736f..e057eeaace3b03dbe7d50aaf8d21065fa6beae9f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -13,7 +13,11 @@ * along with this program. If not, see . */ +#include "tencode.h" +#include "tmsg.h" #include "vnd.h" +#include "vnode.h" +#include "vnodeInt.h" static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -31,158 +35,254 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { - int32_t code = 0; +static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) { + int32_t code = 0; + int32_t lino = 0; + + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // flags + if (tDecodeI32v(pCoder, NULL) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // name + char *name = NULL; + if (tDecodeCStr(pCoder, &name) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // uid + int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name); + if (uid == 0) { + uid = tGenIdPI64(); + } + *(int64_t *)(pCoder->data + pCoder->pos) = uid; + + // ctime + *(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime; + + tEndDecode(pCoder); + +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid); + if (pUid) *pUid = uid; + } + return code; +} +static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + int32_t lino = 0; + + int64_t ctime = taosGetTimestampMs(); SDecoder dc = {0}; + int32_t nReqs; - switch (pMsg->msgType) { - case TDMT_VND_CREATE_TABLE: { - int64_t ctime = taosGetTimestampMs(); - int32_t nReqs; + tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + if (tStartDecode(&dc) < 0) { + code = TSDB_CODE_INVALID_MSG; + return code; + } - tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } + if (tDecodeI32v(&dc, &nReqs) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + for (int32_t iReq = 0; iReq < nReqs; iReq++) { + code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime, NULL); + TSDB_CHECK_CODE(code, lino, _exit); + } - if (tDecodeI32v(&dc, &nReqs) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - for (int32_t iReq = 0; iReq < nReqs; iReq++) { - tb_uid_t uid = tGenIdPI64(); - char *name = NULL; - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + tEndDecode(&dc); - if (tDecodeI32v(&dc, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } - if (tDecodeCStr(&dc, &name) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } - *(int64_t *)(dc.data + dc.pos) = uid; - *(int64_t *)(dc.data + dc.pos + 8) = ctime; +_exit: + tDecoderClear(&dc); + return code; +} +extern int64_t tsMaxKeyByPrecision[]; +static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) { + int32_t code = 0; + int32_t lino = 0; - vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid); - tEndDecode(&dc); - } + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - tEndDecode(&dc); - tDecoderClear(&dc); - } break; - case TDMT_VND_SUBMIT: { - int64_t ctime = taosGetTimestampMs(); + SSubmitTbData submitTbData; + if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg)); - tStartDecode(&dc); + int64_t uid; + if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { + code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime, &uid); + TSDB_CHECK_CODE(code, lino, _exit); + } - uint64_t nSubmitTbData; - if (tDecodeU64v(&dc, &nSubmitTbData) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + // submit data + if (tDecodeI64(pCoder, &submitTbData.suid) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - for (int32_t i = 0; i < nSubmitTbData; i++) { - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { + *(int64_t *)(pCoder->data + pCoder->pos) = uid; + pCoder->pos += sizeof(int64_t); + } else { + tDecodeI64(pCoder, &submitTbData.uid); + } - int32_t flags; - if (tDecodeI32v(&dc, &flags) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { - // SVCreateTbReq - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - if (tDecodeI32v(&dc, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - char *name = NULL; - if (tDecodeCStr(&dc, &name) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name); - if (uid == 0) { - uid = tGenIdPI64(); - } - - *(int64_t *)(dc.data + dc.pos) = uid; - *(int64_t *)(dc.data + dc.pos + 8) = ctime; - - tEndDecode(&dc); - - // SSubmitTbData - int64_t suid; - if (tDecodeI64(&dc, &suid) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - *(int64_t *)(dc.data + dc.pos) = uid; - } + // scan and check + TSKEY now = ctime; + if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) { + now *= 1000; + } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) { + now *= 1000000; + } + TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2; + TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision]; + if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + uint64_t nColData; + if (tDecodeU64v(pCoder, &nColData) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } - tEndDecode(&dc); + SColData colData = {0}; + pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData); + + for (int32_t iRow = 0; iRow < colData.nVal; iRow++) { + if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) { + code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; + goto _exit; } + } + } else { + uint64_t nRow; + if (tDecodeU64v(pCoder, &nRow) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } - tEndDecode(&dc); - tDecoderClear(&dc); - } break; - case TDMT_VND_DELETE: { - int32_t size; - int32_t ret; - uint8_t *pCont; - SEncoder *pCoder = &(SEncoder){0}; - SDeleteRes res = {0}; - SReadHandle handle = { - .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; - - code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); - if (code) { - goto _err; + for (int32_t iRow = 0; iRow < nRow; ++iRow) { + SRow *pRow = (SRow *)(pCoder->data + pCoder->pos); + pCoder->pos += pRow->len; + + if (pRow->ts < minKey || pRow->ts > maxKey) { + code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; + goto _exit; } + } + } + + tEndDecode(pCoder); + +_exit: + return code; +} +static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + int32_t lino = 0; + + SDecoder *pCoder = &(SDecoder){0}; + + tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg)); - // malloc and encode - tEncodeSize(tEncodeDeleteRes, &res, size, ret); - pCont = rpcMallocCont(size + sizeof(SMsgHead)); + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead); - ((SMsgHead *)pCont)->vgId = TD_VID(pVnode); + uint64_t nSubmitTbData; + if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size); - tEncodeDeleteRes(pCoder, &res); - tEncoderClear(pCoder); + int64_t ctime = taosGetTimestampMs(); + for (int32_t i = 0; i < nSubmitTbData; i++) { + code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime); + TSDB_CHECK_CODE(code, lino, _exit); + } - rpcFreeCont(pMsg->pCont); - pMsg->pCont = pCont; - pMsg->contLen = size + sizeof(SMsgHead); + tEndDecode(pCoder); - taosArrayDestroy(res.uidList); +_exit: + tDecoderClear(pCoder); + return code; +} + +static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + + int32_t size; + int32_t ret; + uint8_t *pCont; + SEncoder *pCoder = &(SEncoder){0}; + SDeleteRes res = {0}; + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + + code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); + if (code) goto _exit; + + // malloc and encode + tEncodeSize(tEncodeDeleteRes, &res, size, ret); + pCont = rpcMallocCont(size + sizeof(SMsgHead)); + + ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead); + ((SMsgHead *)pCont)->vgId = TD_VID(pVnode); + + tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size); + tEncodeDeleteRes(pCoder, &res); + tEncoderClear(pCoder); + + rpcFreeCont(pMsg->pCont); + pMsg->pCont = pCont; + pMsg->contLen = size + sizeof(SMsgHead); + + taosArrayDestroy(res.uidList); + +_exit: + return code; +} + +int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + + switch (pMsg->msgType) { + case TDMT_VND_CREATE_TABLE: { + code = vnodePreProcessCreateTableMsg(pVnode, pMsg); + } break; + case TDMT_VND_SUBMIT: { + code = vnodePreProcessSubmitMsg(pVnode, pMsg); + } break; + case TDMT_VND_DELETE: { + code = vnodePreProcessDeleteMsg(pVnode, pMsg); } break; default: break; } - return code; - -_err: - vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code)); +_exit: + if (code) { + vError("vgId%d failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code), + pMsg->msgType); + } return code; } @@ -1055,7 +1155,6 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) { } static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { -#if 1 int32_t code = 0; terrno = 0; @@ -1089,12 +1188,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq tDecoderClear(&dc); } - // check - code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq); - if (code) { - goto _exit; - } - for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) { SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); @@ -1243,145 +1336,6 @@ _exit: } return code; - -#else - SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; - SSubmitRsp submitRsp = {0}; - int32_t nRows = 0; - int32_t tsize, ret; - SEncoder encoder = {0}; - SArray *newTbUids = NULL; - SVStatis statis = {0}; - bool tbCreated = false; - terrno = TSDB_CODE_SUCCESS; - - pRsp->code = 0; - pSubmitReq->version = version; - statis.nBatchInsert = 1; - - if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) { - pRsp->code = terrno; - goto _exit; - } - - submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp)); - newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t)); - if (!submitRsp.pArray || !newTbUids) { - pRsp->code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - for (;;) { - tGetSubmitMsgNext(&msgIter, &pBlock); - if (pBlock == NULL) break; - - SSubmitBlkRsp submitBlkRsp = {0}; - tbCreated = false; - - // create table for auto create table mode - if (msgIter.schemaLen > 0) { - // tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen); - // if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) { - // pRsp->code = TSDB_CODE_INVALID_MSG; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - - // if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) { - // pRsp->code = terrno; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - - // if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) { - // pRsp->code = terrno; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - - if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) { - // if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { - // submitBlkRsp.code = terrno; - // pRsp->code = terrno; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - } else { - if (NULL != submitBlkRsp.pMeta) { - vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta); - } - - // taosArrayPush(newTbUids, &createTbReq.uid); - - submitBlkRsp.uid = createTbReq.uid; - submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); - sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name); - tbCreated = true; - } - - // msgIter.uid = createTbReq.uid; - // if (createTbReq.type == TSDB_CHILD_TABLE) { - // msgIter.suid = createTbReq.ctb.suid; - // } else { - // msgIter.suid = 0; - // } - - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - } - - if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) { - submitBlkRsp.code = terrno; - } - - submitRsp.numOfRows += submitBlkRsp.numOfRows; - submitRsp.affectedRows += submitBlkRsp.affectedRows; - if (tbCreated || submitBlkRsp.code) { - taosArrayPush(submitRsp.pArray, &submitBlkRsp); - } - } - - // if (taosArrayGetSize(newTbUids) > 0) { - // vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), - // (int32_t)taosArrayGetSize(newTbUids)); - // } - - // tqUpdateTbUidList(pVnode->pTq, newTbUids, true); - -_exit: - taosArrayDestroy(newTbUids); - // tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); - // pRsp->pCont = rpcMallocCont(tsize); - // pRsp->contLen = tsize; - // tEncoderInit(&encoder, pRsp->pCont, tsize); - // tEncodeSSubmitRsp(&encoder, &submitRsp); - // tEncoderClear(&encoder); - - taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); - - // TODO: the partial success scenario and the error case - // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level - // 1/level 2. - // TODO: refactor - if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { - statis.nBatchInsertSuccess = 1; - tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); - } - - // N.B. not strict as the following procedure is not atomic - atomic_add_fetch_64(&pVnode->statis.nInsert, submitRsp.numOfRows); - atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows); - atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert); - atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess); - - vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); - return 0; -#endif - return 0; } static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 549ce6ae79f2dfe9ac24092628c167b52b05bf06..1712cba0f53ac3e3f5cf69d3ad8e257851fd55f0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -873,7 +873,7 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, int64_t* pData); -void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId, +void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock); SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 41e4c990f851a3cbcaf3a5982e16dc024caf0eab..f30fe30e351767280bb0e254b20d6f7cd140f273 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -1259,9 +1259,11 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pInfo->srcRowIndex = 0; } break; + case STREAM_CREATE_CHILD_TABLE: { + return pBlock; + } break; default: - ASSERT(0); - break; + ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 370e1f62de382191d8fbb5915c5f348032688504..9fd8f7d3a24e79aba16b266ba4bfa3d681d1cf61 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -985,11 +985,13 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { return pDest; } -void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId, +void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) { void* pValue = NULL; if (streamStateGetParName(pState, groupId, &pValue) != 0) { SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); + memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); + pTmpBlock->info.id.groupId = groupId; if (pTableSup->numOfExprs > 0) { projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); @@ -999,6 +1001,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); memcpy(tbName, varDataVal(pData), len); streamStatePutParName(pState, groupId, tbName); + memcpy(pTmpBlock->info.parTbName, tbName, len); pDestBlock->info.rows--; } else { void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); @@ -1013,7 +1016,6 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false); - pDestBlock->info.rows++; blockDataDestroy(pTmpBlock); } @@ -1030,7 +1032,7 @@ static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions)); SSDataBlock* pSrc = pInfo->pInputDataBlock; - while (pInfo->pTbNameIte != NULL) { + if (pInfo->pTbNameIte != NULL) { SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte; int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0); appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1b9767f193f7b28d190f9cc2ccd1985b225268d7..792225e16c272777964f9d018a7708e3353121c5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4778,6 +4778,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + printDataBlock(pBlock, "single interval"); return pBlock; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index e1baac4a0fe597dee33212289c501a0468b7a531..38c0b3e1ee400f7cc12df6c781e3123015fd6d2a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -744,7 +744,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py -#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py diff --git a/tests/script/tsim/stream/checkStreamSTable.sim b/tests/script/tsim/stream/checkStreamSTable.sim index b60ab0ac05b5e9b88819c50d07ef1661e02cb481..fda78af621f515ac142243cd328be691b4e9711d 100644 --- a/tests/script/tsim/stream/checkStreamSTable.sim +++ b/tests/script/tsim/stream/checkStreamSTable.sim @@ -261,6 +261,21 @@ if $data04 != NULL then goto loop2 endi +print ===== drop ... + +sql drop stream if exists streams0; +sql drop stream if exists streams1; +sql drop stream if exists streams2; +sql drop stream if exists streams3; +sql drop database if exists test; +sql drop database if exists test1; +sql drop database if exists test2; +sql drop database if exists test3; +sql drop database if exists result; +sql drop database if exists result1; +sql drop database if exists result2; +sql drop database if exists result3; + print ===== step6 sql create database result4 vgroups 1;