提交 b7f4b93d 编写于 作者: X Xiaoyu Wang

merge 3.0

......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 22627d7
GIT_TAG 7c641c5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -877,8 +877,8 @@ INTERP(expr)
- The number of rows in the result set of `INTERP` is determined by the parameter `EVERY(time_unit)`. Starting from timestamp1, one interpolation is performed for every time interval specified `time_unit` parameter. The parameter `time_unit` must be an integer, with no quotes, with a time unit of: a(millisecond)), s(second), m(minute), h(hour), d(day), or w(week). For example, `EVERY(500a)` will interpolate every 500 milliseconds.
- Interpolation is performed based on `FILL` parameter. For more information about FILL clause, see [FILL Clause](../distinguished/#fill-clause).
- `INTERP` can only be used to interpolate in single timeline. So it must be used with `partition by tbname` when it's used on a STable.
- Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.1.4).
- Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.2.3).
- Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.2.0).
- Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.3.0).
### LAST
......
......@@ -32,7 +32,9 @@ Please refer to [version support list](/reference/connector#version-support)
## Supported features
### Native connectors
<Tabs defaultValue="native">
<TabItem value="native" label="Native connector">
1. Connection Management
2. General Query
......@@ -41,12 +43,16 @@ Please refer to [version support list](/reference/connector#version-support)
5. Subscription
6. Schemaless
### REST Connector
</TabItem>
<TabItem value="rest" label="REST connector">
1. Connection Management
2. General Query
3. Continuous Query
</TabItem>
</Tabs>
## Installation Steps
### Pre-installation preparation
......@@ -115,6 +121,9 @@ npm install @tdengine/rest
### Verify
<Tabs defaultValue="native">
<TabItem value="native" label="Native connector">
After installing the TDengine client, use the `nodejsChecker.js` program to verify that the current environment supports Node.js access to TDengine.
Verification in details:
......@@ -131,6 +140,28 @@ node nodejsChecker.js host=localhost
- After executing the above steps, the command-line will output the result of `nodejsChecker.js` connecting to the TDengine instance and performing a simple insert and query.
</TabItem>
<TabItem value="rest" label="REST connector">
After installing the TDengine client, use the `restChecker.js` program to verify that the current environment supports Node.js access to TDengine.
Verification in details:
- Create an installation test folder such as `~/tdengine-test`. Download the [restChecker.js source code](https://github.com/taosdata/TDengine/tree/3.0/docs/examples/node/restexample/restChecker.js) to your local.
- Execute the following command from the command-line.
```bash
npm init -y
npm install @tdengine/rest
node restChecker.js
```
- After executing the above steps, the command-line will output the result of `restChecker.js` connecting to the TDengine instance and performing a simple insert and query.
</TabItem>
</Tabs>
## Establishing a connection
Please choose to use one of the connectors.
......@@ -182,24 +213,69 @@ let cursor = conn.cursor();
#### SQL Write
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeInsert />
</TabItem>
<TabItem value="rest" label="REST connection">
```js
{{#include docs/examples/node/restexample/insert_example.js}}
```
</TabItem>
</Tabs>
#### InfluxDB line protocol write
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeInfluxLine />
</TabItem>
</Tabs>
#### OpenTSDB Telnet line protocol write
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeOpenTSDBTelnet />
</TabItem>
</Tabs>
#### OpenTSDB JSON line protocol write
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeOpenTSDBJson />
</TabItem>
</Tabs>
### Querying data
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeQuery />
</TabItem>
<TabItem value="rest" label="REST connection">
```js
{{#include docs/examples/node/restexample/query_example.js}}
```
</TabItem>
</Tabs>
## More sample programs
......
......@@ -179,9 +179,10 @@ The parameters described in this document by the effect that they have on the sy
| Attribute | Description |
| -------- | -------------------------------- |
| Applicable | Server only |
| Meaning | count()/hyperloglog() return value or not if the result data is NULL |
| Meaning | count()/hyperloglog() return value or not if the input data is empty or NULL |
| Vlue Range | 0:Return empty line,1:Return 0 |
| Default | 1 |
| Notes | When this parameter is setting to 1, for queries containing GROUP BY, PARTITION BY and INTERVAL clause, and input data in certain groups or windows is empty or NULL, the corresponding groups or windows have no return values |
### maxNumOfDistinctRes
......
const { options, connect } = require("@tdengine/rest");
async function sqlInsert() {
options.path = "/rest/sql";
options.host = "localhost";
options.port = 6041;
let conn = connect(options);
let cursor = conn.cursor();
try {
let res = await cursor.query('CREATE DATABASE power');
res = await cursor.query('CREATE STABLE power.meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int)');
res = await cursor.query('INSERT INTO power.d1001 USING power.meters TAGS ("California.SanFrancisco", 2) VALUES (NOW, 10.2, 219, 0.32)');
console.log("res.getResult()", res.getResult());
} catch (err) {
console.log(err);
}
}
sqlInsert();
const { options, connect } = require("@tdengine/rest");
async function query() {
options.path = "/rest/sql";
options.host = "localhost";
options.port = 6041;
let conn = connect(options);
let cursor = conn.cursor();
try {
let res = await cursor.query('select * from power.meters');
console.log("res.getResult()", res.getResult());
} catch (err) {
console.log(err);
}
}
query();
const { options, connect } = require("@tdengine/rest");
options.path = '/rest/sql/'
options.host = 'localhost';
options.port = 6041;
options.user = "root";
options.passwd = "taosdata";
//optional
// options.url = "http://127.0.0.1:6041";
const db = 'rest_ts_db';
const table = 'rest'
const createDB = `create database if not exists ${db} keep 3650`;
const dropDB = `drop database if exists ${db}`;
const createTB = `create table if not exists ${db}.${table}(ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint,bnr binary(40),nchr nchar(40))`;
const addColumn = `alter table ${db}.${table} add column new_column nchar(40) `;
const dropColumn = `alter table ${db}.${table} drop column new_column`;
const insertSql = `insert into ${db}.${table} values('2022-03-30 18:30:51.567',1,2,3,4,'binary1','nchar1')` +
`('2022-03-30 18:30:51.568',5,6,7,8,'binary2','nchar2')` +
`('2022-03-30 18:30:51.569',9,0,1,2,'binary3','nchar3')`;
const querySql = `select * from ${db}.${table}`;
const errorSql = 'show database';
let conn = connect(options);
let cursor = conn.cursor();
async function execute(sql, pure = true) {
let result = await cursor.query(sql, pure);
// print query result as taos shell
// Get Result object, return Result object.
console.log("result.getResult()",result.getResult());
// Get Meta data, return Meta[]|undefined(when execute failed this is undefined).
console.log("result.getMeta()",result.getMeta());
// Get data,return Array<Array<any>>|undefined(when execute failed this is undefined).
console.log("result.getData()",result.getData());
// Get affect rows,return number|undefined(when execute failed this is undefined).
console.log("result.getAffectRows()",result.getAffectRows());
// Get command,return SQL send to server(need to `query(sql,false)`,set 'pure=false',default true).
console.log("result.getCommand()",result.getCommand());
// Get error code ,return number|undefined(when execute failed this is undefined).
console.log("result.getErrCode()",result.getErrCode());
// Get error string,return string|undefined(when execute failed this is undefined).
console.log("result.getErrStr()",result.getErrStr());
}
(async () => {
// start execute time
let start = new Date().getTime();
await execute(createDB);
console.log("-----------------------------------")
await execute(createTB);
console.log("-----------------------------------")
await execute(addColumn);
console.log("----------------------------------")
await execute(dropColumn);
console.log("-----------------------------------")
await execute(insertSql);
console.log("-----------------------------------")
await execute(querySql);
console.log("-----------------------------------")
await execute(errorSql);
console.log("-----------------------------------")
await execute(dropDB);
// finish time
let end = new Date().getTime();
console.log("total spend time:%d ms",end - start);
})()
......@@ -31,7 +31,8 @@ REST 连接器支持所有能运行 Node.js 的平台。
## 支持的功能特性
### 原生连接器
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接器">
1. 连接管理
2. 普通查询
......@@ -40,12 +41,17 @@ REST 连接器支持所有能运行 Node.js 的平台。
5. 订阅功能
6. Schemaless
### REST 连接器
</TabItem>
<TabItem value="rest" label="REST 连接器">
1. 连接管理
2. 普通查询
3. 连续查询
</TabItem>
</Tabs>
## 安装步骤
### 安装前准备
......@@ -114,6 +120,9 @@ npm install @tdengine/rest
### 安装验证
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接器">
在安装好 TDengine 客户端后,使用 nodejsChecker.js 程序能够验证当前环境是否支持 Node.js 方式访问 TDengine。
验证方法:
......@@ -130,11 +139,35 @@ node nodejsChecker.js host=localhost
- 执行以上步骤后,在命令行会输出 nodejsChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。
</TabItem>
<TabItem value="rest" label="REST 连接器">
在安装好 TDengine 客户端后,使用 nodejsChecker.js 程序能够验证当前环境是否支持 Node.js 方式访问 TDengine。
验证方法:
- 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [restChecker.js 源代码](https://github.com/taosdata/TDengine/tree/3.0/docs/examples/node/restexample/restChecker.js)到本地。
- 在命令行中执行以下命令。
```bash
npm init -y
npm install @tdengine/rest
node restChecker.js
```
- 执行以上步骤后,在命令行会输出 restChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。
</TabItem>
</Tabs>
## 建立连接
请选择使用一种连接器。
<Tabs defaultValue="rest">
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
安装并引用 `@tdengine/client` 包。
......@@ -181,24 +214,71 @@ let cursor = conn.cursor();
#### SQL 写入
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeInsert />
</TabItem>
<TabItem value="rest" label="REST 连接">
```js
{{#include docs/examples/node/restexample/insert_example.js}}
```
</TabItem>
</Tabs>
#### InfluxDB 行协议写入
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeInfluxLine />
</TabItem>
</Tabs>
#### OpenTSDB Telnet 行协议写入
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeOpenTSDBTelnet />
</TabItem>
</Tabs>
#### OpenTSDB JSON 行协议写入
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeOpenTSDBJson />
</TabItem>
</Tabs>
### 查询数据
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeQuery />
</TabItem>
<TabItem value="rest" label="REST 连接">
```js
{{#include docs/examples/node/restexample/query_example.js}}
```
</TabItem>
</Tabs>
## 更多示例程序
......
......@@ -879,8 +879,8 @@ INTERP(expr)
- INTERP 根据 EVERY(time_unit) 字段来确定输出时间范围内的结果条数,即从 timestamp1 开始每隔固定长度的时间(time_unit 值)进行插值,time_unit 可取值时间单位:1a(毫秒),1s(秒),1m(分),1h(小时),1d(天),1w(周)。例如 EVERY(500a) 将对于指定数据每500毫秒间隔进行一次插值.
- INTERP 根据 FILL 字段来决定在每个符合输出条件的时刻如何进行插值。关于 FILL 子句如何使用请参考 [FILL 子句](../distinguished/#fill-子句)
- INTERP 只能在一个时间序列内进行插值,因此当作用于超级表时必须跟 partition by tbname 一起使用。
- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.1.4版本以后支持)。
- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.2.3版本以后支持)。
- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0版本以后支持)。
- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0版本以后支持)。
### LAST
......
......@@ -197,9 +197,10 @@ taos --dump-config
| 属性 | 说明 |
| -------- | -------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | count/hyperloglog函数在数据为空或者NULL的情况下是否返回值 |
| 含义 | count/hyperloglog函数在输入数据为空或者NULL的情况下是否返回值 |
| 取值范围 | 0:返回空行,1:返回 0 |
| 缺省值 | 1 |
| 补充说明 | 该参数设置为 1 时,如果查询中含有 GROUP BY,PARTITION BY 以及 INTERVAL 子句且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果 |
## 区域相关
......
......@@ -205,7 +205,7 @@ struct SColData {
int32_t numOfNull; // # of null
int32_t numOfValue; // # of vale
int32_t nVal;
uint8_t flag;
int8_t flag;
uint8_t *pBitMap;
int32_t *aOffset;
int32_t nData;
......
TDengine is a high-efficient, scalable, high-available distributed time-series database, which makes a lot of optimizations on inserting and querying data, which is far more efficient than normal regular databases. So TDengine can meet the high requirements of IOT and other areas on storing and querying a large amount of data.
TDengine is an open-source, cloud-native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. With its built-in caching, stream processing, and data subscription capabilities, TDengine offers a simplified solution for time-series data processing.
To configure TDengine : edit /etc/taos/taos.cfg
To start service : launchctl start com.tdengine.taosd
......
TDengine is an open-source, cloud-native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. With its built-in caching, stream processing, and data subscription capabilities, TDengine offers a simplified solution for time-series data processing.
Once it's installed, please take the steps below:
1: open a terminal/shell in Mac
2: if connecting to Cloud Service, follow the instructions on your cloud service account and configure the environment variable
3: if connecting to another TDengine Service, you can also view help information via "taos --help"
4: execute command taos
\ No newline at end of file
......@@ -25,7 +25,9 @@
#include "tref.h"
#include "ttimer.h"
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
static tb_uid_t processSuid(tb_uid_t suid, char* db){
return suid + MurmurHash3_32(db, strlen(db));
}
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t) {
......@@ -1642,6 +1644,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
if (strcmp(tbName, pCreateReq.name) == 0) {
cloneSVreateTbReq(&pCreateReq, &pCreateReqDst);
pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb);
tDecoderClear(&decoderTmp);
break;
}
......
......@@ -1358,7 +1358,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
taosArrayDestroy(qa);
} else {
// TODO handle delete table from stb
tqReaderRemoveTbUidList(pExec->execHandle.pExecReader, tbUidList);
}
}
}
......
......@@ -498,7 +498,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
taosArrayPush(tagArray, &tagVal);
}
}
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
pCreateTbReq->ctb.tagNum = size;
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
......
......@@ -22,7 +22,7 @@
* us: 3600*1000000*8765*1000 // 1970 + 1000 years
* ns: 3600*1000000000*8765*292 // 1970 + 292 years
*/
static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
......@@ -60,23 +60,6 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
return 0;
}
#if 0
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey = TD_ROW_KEY(row);
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d, table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
rowKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
}
return 0;
}
#endif
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
if (rowKey < minKey || rowKey > maxKey) {
......@@ -89,79 +72,6 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowK
return 0;
}
#if 0
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
terrno = TSDB_CODE_SUCCESS;
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
// pBlock->uid = htobe64(pBlock->uid);
// pBlock->suid = htobe64(pBlock->suid);
// pBlock->sversion = htonl(pBlock->sversion);
// pBlock->dataLen = htonl(pBlock->dataLen);
// pBlock->schemaLen = htonl(pBlock->schemaLen);
// pBlock->numOfRows = htonl(pBlock->numOfRows);
#if 0
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d, invalid action trying to insert a super table %s", REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pTsdb, pBlock, pTable) < 0) {
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
continue;
} else {
return -1;
}
}
#endif
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) {
return -1;
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
#endif
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
int32_t code = 0;
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
......
......@@ -13,7 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tencode.h"
#include "tmsg.h"
#include "vnd.h"
#include "vnode.h"
#include "vnodeInt.h"
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
......@@ -31,132 +35,210 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) {
int32_t code = 0;
SDecoder dc = {0};
int32_t lino = 0;
switch (pMsg->msgType) {
case TDMT_VND_CREATE_TABLE: {
int64_t ctime = taosGetTimestampMs();
int32_t nReqs;
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
if (tStartDecode(&dc) < 0) {
if (tStartDecode(pCoder) < 0) {
code = TSDB_CODE_INVALID_MSG;
return code;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (tDecodeI32v(&dc, &nReqs) < 0) {
// flags
if (tDecodeI32v(pCoder, NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
tb_uid_t uid = tGenIdPI64();
// name
char *name = NULL;
if (tStartDecode(&dc) < 0) {
if (tDecodeCStr(pCoder, &name) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (tDecodeI32v(&dc, NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
return code;
// uid
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
if (uid == 0) {
uid = tGenIdPI64();
}
if (tDecodeCStr(&dc, &name) < 0) {
*(int64_t *)(pCoder->data + pCoder->pos) = uid;
// ctime
*(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime;
tEndDecode(pCoder);
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
if (pUid) *pUid = uid;
}
return code;
}
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t lino = 0;
int64_t ctime = taosGetTimestampMs();
SDecoder dc = {0};
int32_t nReqs;
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
if (tStartDecode(&dc) < 0) {
code = TSDB_CODE_INVALID_MSG;
return code;
}
*(int64_t *)(dc.data + dc.pos) = uid;
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
tEndDecode(&dc);
if (tDecodeI32v(&dc, &nReqs) < 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
}
tEndDecode(&dc);
tDecoderClear(&dc);
} break;
case TDMT_VND_SUBMIT: {
int64_t ctime = taosGetTimestampMs();
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
tStartDecode(&dc);
_exit:
tDecoderClear(&dc);
return code;
}
extern int64_t tsMaxKeyByPrecision[];
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) {
int32_t code = 0;
int32_t lino = 0;
uint64_t nSubmitTbData;
if (tDecodeU64v(&dc, &nSubmitTbData) < 0) {
if (tStartDecode(pCoder) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < nSubmitTbData; i++) {
if (tStartDecode(&dc) < 0) {
SSubmitTbData submitTbData;
if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t flags;
if (tDecodeI32v(&dc, &flags) < 0) {
int64_t uid;
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime, &uid);
TSDB_CHECK_CODE(code, lino, _exit);
}
// submit data
if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
// SVCreateTbReq
if (tStartDecode(&dc) < 0) {
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
*(int64_t *)(pCoder->data + pCoder->pos) = uid;
pCoder->pos += sizeof(int64_t);
} else {
tDecodeI64(pCoder, &submitTbData.uid);
}
if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (tDecodeI32v(&dc, NULL) < 0) {
// scan and check
TSKEY now = ctime;
if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
now *= 1000;
} else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
now *= 1000000;
}
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
uint64_t nColData;
if (tDecodeU64v(pCoder, &nColData) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
goto _exit;
}
char *name = NULL;
if (tDecodeCStr(&dc, &name) < 0) {
SColData colData = {0};
pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
goto _exit;
}
}
} else {
uint64_t nRow;
if (tDecodeU64v(pCoder, &nRow) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
goto _exit;
}
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
if (uid == 0) {
uid = tGenIdPI64();
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
pCoder->pos += pRow->len;
if (pRow->ts < minKey || pRow->ts > maxKey) {
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
goto _exit;
}
}
}
*(int64_t *)(dc.data + dc.pos) = uid;
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
tEndDecode(pCoder);
tEndDecode(&dc);
_exit:
return code;
}
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t lino = 0;
SDecoder *pCoder = &(SDecoder){0};
tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
// SSubmitTbData
int64_t suid;
if (tDecodeI64(&dc, &suid) < 0) {
if (tStartDecode(pCoder) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
*(int64_t *)(dc.data + dc.pos) = uid;
uint64_t nSubmitTbData;
if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
tEndDecode(&dc);
int64_t ctime = taosGetTimestampMs();
for (int32_t i = 0; i < nSubmitTbData; i++) {
code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime);
TSDB_CHECK_CODE(code, lino, _exit);
}
tEndDecode(&dc);
tDecoderClear(&dc);
} break;
case TDMT_VND_DELETE: {
tEndDecode(pCoder);
_exit:
tDecoderClear(pCoder);
return code;
}
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t size;
int32_t ret;
uint8_t *pCont;
SEncoder *pCoder = &(SEncoder){0};
SDeleteRes res = {0};
SReadHandle handle = {
.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
if (code) {
goto _err;
}
if (code) goto _exit;
// malloc and encode
tEncodeSize(tEncodeDeleteRes, &res, size, ret);
......@@ -174,15 +256,33 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
pMsg->contLen = size + sizeof(SMsgHead);
taosArrayDestroy(res.uidList);
_exit:
return code;
}
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
switch (pMsg->msgType) {
case TDMT_VND_CREATE_TABLE: {
code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
} break;
case TDMT_VND_SUBMIT: {
code = vnodePreProcessSubmitMsg(pVnode, pMsg);
} break;
case TDMT_VND_DELETE: {
code = vnodePreProcessDeleteMsg(pVnode, pMsg);
} break;
default:
break;
}
return code;
_err:
vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
_exit:
if (code) {
vError("vgId%d failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
pMsg->msgType);
}
return code;
}
......@@ -1055,7 +1155,6 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
}
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
#if 1
int32_t code = 0;
terrno = 0;
......@@ -1089,12 +1188,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
tDecoderClear(&dc);
}
// check
code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
if (code) {
goto _exit;
}
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
......@@ -1243,145 +1336,6 @@ _exit:
}
return code;
#else
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp submitRsp = {0};
int32_t nRows = 0;
int32_t tsize, ret;
SEncoder encoder = {0};
SArray *newTbUids = NULL;
SVStatis statis = {0};
bool tbCreated = false;
terrno = TSDB_CODE_SUCCESS;
pRsp->code = 0;
pSubmitReq->version = version;
statis.nBatchInsert = 1;
if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
pRsp->code = terrno;
goto _exit;
}
submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
if (!submitRsp.pArray || !newTbUids) {
pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (;;) {
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
SSubmitBlkRsp submitBlkRsp = {0};
tbCreated = false;
// create table for auto create table mode
if (msgIter.schemaLen > 0) {
// tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
// if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
// pRsp->code = TSDB_CODE_INVALID_MSG;
// tDecoderClear(&decoder);
// taosArrayDestroy(createTbReq.ctb.tagName);
// goto _exit;
// }
// if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
// pRsp->code = terrno;
// tDecoderClear(&decoder);
// taosArrayDestroy(createTbReq.ctb.tagName);
// goto _exit;
// }
// if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
// pRsp->code = terrno;
// tDecoderClear(&decoder);
// taosArrayDestroy(createTbReq.ctb.tagName);
// goto _exit;
// }
if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) {
// if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
// submitBlkRsp.code = terrno;
// pRsp->code = terrno;
// tDecoderClear(&decoder);
// taosArrayDestroy(createTbReq.ctb.tagName);
// goto _exit;
// }
} else {
if (NULL != submitBlkRsp.pMeta) {
vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
}
// taosArrayPush(newTbUids, &createTbReq.uid);
submitBlkRsp.uid = createTbReq.uid;
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
tbCreated = true;
}
// msgIter.uid = createTbReq.uid;
// if (createTbReq.type == TSDB_CHILD_TABLE) {
// msgIter.suid = createTbReq.ctb.suid;
// } else {
// msgIter.suid = 0;
// }
// tDecoderClear(&decoder);
// taosArrayDestroy(createTbReq.ctb.tagName);
}
if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
submitBlkRsp.code = terrno;
}
submitRsp.numOfRows += submitBlkRsp.numOfRows;
submitRsp.affectedRows += submitBlkRsp.affectedRows;
if (tbCreated || submitBlkRsp.code) {
taosArrayPush(submitRsp.pArray, &submitBlkRsp);
}
}
// if (taosArrayGetSize(newTbUids) > 0) {
// vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
// (int32_t)taosArrayGetSize(newTbUids));
// }
// tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
_exit:
taosArrayDestroy(newTbUids);
// tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
// pRsp->pCont = rpcMallocCont(tsize);
// pRsp->contLen = tsize;
// tEncoderInit(&encoder, pRsp->pCont, tsize);
// tEncodeSSubmitRsp(&encoder, &submitRsp);
// tEncoderClear(&encoder);
taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);
// TODO: the partial success scenario and the error case
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
// 1/level 2.
// TODO: refactor
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
statis.nBatchInsertSuccess = 1;
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
}
// N.B. not strict as the following procedure is not atomic
atomic_add_fetch_64(&pVnode->statis.nInsert, submitRsp.numOfRows);
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows);
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert);
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess);
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
return 0;
#endif
return 0;
}
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
......
......@@ -873,7 +873,7 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
int64_t* pData);
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
......
......@@ -1259,9 +1259,11 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = 0;
} break;
case STREAM_CREATE_CHILD_TABLE: {
return pBlock;
} break;
default:
ASSERT(0);
break;
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
}
}
......
......@@ -985,11 +985,13 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
return pDest;
}
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) {
void* pValue = NULL;
if (streamStateGetParName(pState, groupId, &pValue) != 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
pTmpBlock->info.id.groupId = groupId;
if (pTableSup->numOfExprs > 0) {
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
......@@ -999,6 +1001,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, varDataVal(pData), len);
streamStatePutParName(pState, groupId, tbName);
memcpy(pTmpBlock->info.parTbName, tbName, len);
pDestBlock->info.rows--;
} else {
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
......@@ -1013,7 +1016,6 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
pDestBlock->info.rows++;
blockDataDestroy(pTmpBlock);
}
......@@ -1030,7 +1032,7 @@ static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions));
SSDataBlock* pSrc = pInfo->pInputDataBlock;
while (pInfo->pTbNameIte != NULL) {
if (pInfo->pTbNameIte != NULL) {
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0);
appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
......
......@@ -4778,6 +4778,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
printDataBlock(pBlock, "single interval");
return pBlock;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
......
......@@ -744,7 +744,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py
......
......@@ -261,6 +261,21 @@ if $data04 != NULL then
goto loop2
endi
print ===== drop ...
sql drop stream if exists streams0;
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop database if exists test;
sql drop database if exists test1;
sql drop database if exists test2;
sql drop database if exists test3;
sql drop database if exists result;
sql drop database if exists result1;
sql drop database if exists result2;
sql drop database if exists result3;
print ===== step6
sql create database result4 vgroups 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册