提交 43e06b91 编写于 作者: H Haojun Liao

Merge branch '3.0' into feature/3_liaohj

......@@ -23,7 +23,7 @@ By subscribing to a topic, a consumer can obtain the latest data in that topic i
To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.
Tips:The default data subscription is to consume data from the wal. If the wal is deleted, the consumed data will be incomplete. At this time, you can set the parameter experimental.snapshot.enable to true to obtain all data from the tsdb, but in this way, the consumption order of the data cannot be guaranteed. Therefore, it is recommended to set a reasonable retention policy for WAL based on your consumption situation to ensure that you can subscribe all data from WAL.
Tips: Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter `WAL_RETENTION_PERIOD` or `WAL_RETENTION_SIZE` when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products.
## Data Schema and API
......@@ -294,7 +294,6 @@ You configure the following parameters when creating a consumer:
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `experimental.snapshot.enable` | boolean | Specify whether to consume data in TSDB; true: both data in WAL and in TSDB can be consumed; false: only data in WAL can be consumed | default value: false |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false
The method of specifying these parameters depends on the language used:
......@@ -312,7 +311,6 @@ tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -368,7 +366,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
consumer, err := NewConsumer(conf)
......@@ -416,7 +413,6 @@ Python programs use the following parameters:
| `enable.auto.commit` | string | Commit automatically | pecify `true` or `false` |
| `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | |
| `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `experimental.snapshot.enable` | string | Specify whether it's allowed to consume messages from the WAL or from TSDB | Specify `true` or `false` |
| `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` |
</TabItem>
......
......@@ -55,7 +55,7 @@ window_clause: {
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
interp_clause:
RANGE(ts_val, ts_val), EVERY(every_val), FILL(fill_mod_and_val)
RANGE(ts_val, ts_val) EVERY(every_val) FILL(fill_mod_and_val)
partition_by_clause:
PARTITION BY expr [, expr] ...
......
......@@ -886,7 +886,7 @@ INTERP(expr)
- The output time range of `INTERP` is specified by `RANGE(timestamp1,timestamp2)` parameter, with timestamp1 <= timestamp2. timestamp1 is the starting point of the output time range and must be specified. timestamp2 is the ending point of the output time range and must be specified.
- The number of rows in the result set of `INTERP` is determined by the parameter `EVERY(time_unit)`. Starting from timestamp1, one interpolation is performed for every time interval specified `time_unit` parameter. The parameter `time_unit` must be an integer, with no quotes, with a time unit of: a(millisecond)), s(second), m(minute), h(hour), d(day), or w(week). For example, `EVERY(500a)` will interpolate every 500 milliseconds.
- Interpolation is performed based on `FILL` parameter. For more information about FILL clause, see [FILL Clause](../distinguished/#fill-clause).
- `INTERP` can only be used to interpolate in single timeline. So it must be used with `partition by tbname` when it's used on a STable.
- `INTERP` can be applied to supertable by interpolating primary key sorted data of all its childtables. It can also be used with `partition by tbname` when applied to supertable to generate interpolation on each single timeline.
- Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.2.0).
- Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.3.0).
......
......@@ -25,7 +25,8 @@ import CDemo from "./_sub_c.mdx";
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
注意:默认是从wal消费数据,如果wal被删除,消费到的数据会不全,此时可以将参数 experimental.snapshot.enable 设置为true,从tsdb获取全部数据,但是这样的话就不能保证数据的消费顺序。所以建议根据自己的消费情况合理的设置wal的保留策略,保证可以从wal里订阅到全部数据。
注意:数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似。
## 主要数据结构和 API
不同语言下, TMQ 订阅相关的 API 及数据结构如下:
......@@ -293,7 +294,6 @@ CREATE TOPIC topic_name AS DATABASE db_name;
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default;从头开始订阅; <br/>`latest`: 仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据。当其关闭时,只能消费依据 WAL 保留策略仍然在WAL中的数据;当其打开时,除WAL中的数据以外,也能够消费已经从WAL中删除但落盘到TSDB中的数据 | 实验功能,默认关闭 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) |默认关闭 |
对于不同编程语言,其设置方式如下:
......@@ -311,7 +311,6 @@ tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -367,7 +366,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
consumer, err := NewConsumer(conf)
......@@ -417,7 +415,6 @@ consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
| `enable.auto.commit` | string | 启用自动提交 | 合法值:`true`, `false` |
| `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms |
| `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
</TabItem>
......
......@@ -55,7 +55,7 @@ window_clause: {
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
interp_clause:
RANGE(ts_val, ts_val), EVERY(every_val), FILL(fill_mod_and_val)
RANGE(ts_val, ts_val) EVERY(every_val) FILL(fill_mod_and_val)
partition_by_clause:
PARTITION BY expr [, expr] ...
......
......@@ -888,7 +888,7 @@ INTERP(expr)
- INTERP 的输出时间范围根据 RANGE(timestamp1,timestamp2)字段来指定,需满足 timestamp1 <= timestamp2。其中 timestamp1(必选值)为输出时间范围的起始值,即如果 timestamp1 时刻符合插值条件则 timestamp1 为输出的第一条记录,timestamp2(必选值)为输出时间范围的结束值,即输出的最后一条记录的 timestamp 不能大于 timestamp2。
- INTERP 根据 EVERY(time_unit) 字段来确定输出时间范围内的结果条数,即从 timestamp1 开始每隔固定长度的时间(time_unit 值)进行插值,time_unit 可取值时间单位:1a(毫秒),1s(秒),1m(分),1h(小时),1d(天),1w(周)。例如 EVERY(500a) 将对于指定数据每500毫秒间隔进行一次插值.
- INTERP 根据 FILL 字段来决定在每个符合输出条件的时刻如何进行插值。关于 FILL 子句如何使用请参考 [FILL 子句](../distinguished/#fill-子句)
- INTERP 只能在一个时间序列内进行插值,因此当作用于超级表时必须跟 partition by tbname 一起使用
- INTERP 作用于超级表时, 会将该超级表下的所有子表数据按照主键列排序后进行插值计算,也可以搭配 PARTITION BY tbname 使用,将结果强制规约到单个时间线
- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0版本以后支持)。
- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0版本以后支持)。
......
......@@ -38,6 +38,11 @@ typedef struct STimeSliceOperatorInfo {
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
int64_t prevTs;
bool prevTsSet;
uint64_t groupId;
SGroupKeys* pPrevGroupKey;
SSDataBlock* pNextGroupRes;
} STimeSliceOperatorInfo;
static void destroyTimeSliceOperatorInfo(void* param);
......@@ -168,18 +173,55 @@ static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
}
static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol,
int32_t curIndex, int32_t rows) {
int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex);
if (currentTs > pSliceInfo->win.ekey) {
return false;
}
if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) {
return true;
}
pSliceInfo->prevTsSet = true;
pSliceInfo->prevTs = currentTs;
if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
if (currentTs == nextTs) {
return true;
}
}
return false;
}
static bool isInterpFunc(SExprInfo* pExprInfo) {
int32_t functionType = pExprInfo->pExpr->_function.functionType;
return (functionType == FUNCTION_TYPE_INTERP);
}
static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
int32_t functionType = pExprInfo->pExpr->_function.functionType;
return (functionType == FUNCTION_TYPE_GROUP_KEY);
}
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
bool beforeTs) {
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs) {
int32_t rows = pResBlock->info.rows;
timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
// todo set the correct primary timestamp column
// output the result
bool hasInterp = true;
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
if (isIrowtsPseudoColumn(pExprInfo)) {
......@@ -189,6 +231,30 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
bool isFilled = true;
colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false);
continue;
} else if (!isInterpFunc(pExprInfo)) {
if (isGroupKeyFunc(pExprInfo)) {
if (pSrcBlock != NULL) {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
if (colDataIsNull_s(pSrc, index)) {
colDataSetNULL(pDst, pResBlock->info.rows);
continue;
}
char* v = colDataGetData(pSrc, index);
colDataSetVal(pDst, pResBlock->info.rows, v, false);
} else {
// use stored group key
SGroupKeys* pkey = pSliceInfo->pPrevGroupKey;
if (pkey->isNull == false) {
colDataSetVal(pDst, rows, pkey->pData, false);
} else {
colDataSetNULL(pDst, rows);
}
}
}
continue;
}
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
......@@ -314,7 +380,7 @@ static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp*
bool isFilled = false;
colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
} else {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
if (colDataIsNull_s(pSrc, index)) {
......@@ -414,7 +480,31 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
return TSDB_CODE_SUCCESS;
}
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) {
if (pInfo->pPrevGroupKey != NULL) {
return TSDB_CODE_SUCCESS;
}
pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys));
if (pInfo->pPrevGroupKey == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
if (isGroupKeyFunc(pExprInfo)) {
pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes;
pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type;
pInfo->pPrevGroupKey->isNull = false;
pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) {
int32_t code;
code = initPrevRowsKeeper(pInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
......@@ -431,138 +521,275 @@ static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock
return TSDB_CODE_FAILED;
}
code = initGroupKeyKeeper(pInfo, pExprSup);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
static int32_t resetPrevRowsKeeper(STimeSliceOperatorInfo* pInfo) {
if (pInfo->pPrevRow == NULL) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
pKey->isNull = false;
}
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp;
pInfo->isPrevRowSet = false;
int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0];
return TSDB_CODE_SUCCESS;
}
blockDataCleanup(pResBlock);
static int32_t resetNextRowsKeeper(STimeSliceOperatorInfo* pInfo) {
if (pInfo->pNextRow == NULL) {
return TSDB_CODE_SUCCESS;
}
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
}
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
pKey->isNull = false;
}
pInfo->isNextRowSet = false;
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
return TSDB_CODE_SUCCESS;
}
static int32_t resetFillLinearInfo(STimeSliceOperatorInfo* pInfo) {
if (pInfo->pLinearInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
SFillLinearInfo *pLinearInfo = taosArrayGet(pInfo->pLinearInfo, i);
pLinearInfo->start.key = INT64_MIN;
pLinearInfo->end.key = INT64_MIN;
pLinearInfo->isStartSet = false;
pLinearInfo->isEndSet = false;
}
return TSDB_CODE_SUCCESS;
}
static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
resetPrevRowsKeeper(pInfo);
resetNextRowsKeeper(pInfo);
resetFillLinearInfo(pInfo);
return TSDB_CODE_SUCCESS;
}
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock = pSliceInfo->pRes;
SInterval* pInterval = &pSliceInfo->interval;
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
// check for duplicate timestamps
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
if (ts == pSliceInfo->current) {
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
doKeepPrevRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
} else if (ts < pSliceInfo->current) {
// in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
if (ts == pSliceInfo->current) {
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
doKeepPrevRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
} else if (ts < pSliceInfo->current) {
// in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
if (i < pBlock->info.rows - 1) {
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i + 1);
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
pInterval->precision);
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
if (i < pBlock->info.rows - 1) {
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i + 1);
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
pInterval->precision);
}
} else {
// ignore current row, and do nothing
}
} else { // it is the last row of current block
doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // ts > pSliceInfo->current
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
if (pSliceInfo->current > pSliceInfo->win.ekey) {
break;
} else {
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
} else {
// ignore current row, and do nothing
}
// add current row if timestamp match
if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
} else { // it is the last row of current block
doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // ts > pSliceInfo->current
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
doKeepPrevRows(pSliceInfo, pBlock, i);
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
// add current row if timestamp match
if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
doKeepPrevRows(pSliceInfo, pBlock, i);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
break;
}
}
}
}
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
SSDataBlock* pResBlock = pSliceInfo->pRes;
SInterval* pInterval = &pSliceInfo->interval;
// check if need to interpolate after last datablock
// except for fill(next), fill(linear)
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false);
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
}
static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) {
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
if (isGroupKeyFunc(pExprInfo)) {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
if (colDataIsNull_s(pSrc, 0)) {
pGroupKey->isNull = true;
break;
}
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
char* v = colDataGetData(pSrc, 0);
if (IS_VAR_DATA_TYPE(pGroupKey->type)) {
memcpy(pGroupKey->pData, v, varDataTLen(v));
} else {
memcpy(pGroupKey->pData, v, pGroupKey->bytes);
}
pGroupKey->isNull = false;
break;
}
}
}
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
pSliceInfo->current = pSliceInfo->win.skey;
pSliceInfo->prevTsSet = false;
resetKeeperInfo(pSliceInfo);
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0];
blockDataCleanup(pResBlock);
while (1) {
if (pSliceInfo->pNextGroupRes != NULL) {
setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes);
pSliceInfo->pNextGroupRes = NULL;
}
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
setOperatorCompleted(pOperator);
break;
}
if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
pSliceInfo->groupId = pBlock->info.id.groupId;
} else {
if (pSliceInfo->groupId != pBlock->info.id.groupId) {
pSliceInfo->groupId = pBlock->info.id.groupId;
pSliceInfo->pNextGroupRes = pBlock;
break;
}
}
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
}
// check if need to interpolate after last datablock
// except for fill(next), fill(linear)
genInterpAfterDataBlock(pSliceInfo, pOperator, 0);
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
if (pOperator->status == OP_EXEC_DONE) {
break;
}
// restore initial value for next group
resetTimesliceInfo(pSliceInfo);
if (pResBlock->info.rows >= 4096) {
break;
}
}
// restore the value
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
......@@ -614,6 +841,11 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->win = pInterpPhyNode->timeRange;
pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey;
pInfo->prevTsSet = false;
pInfo->prevTs = 0;
pInfo->groupId = 0;
pInfo->pPrevGroupKey = NULL;
pInfo->pNextGroupRes = NULL;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
......@@ -661,6 +893,10 @@ void destroyTimeSliceOperatorInfo(void* param) {
taosMemoryFree(pKey->end.val);
}
taosArrayDestroy(pInfo->pLinearInfo);
taosMemoryFree(pInfo->pPrevGroupKey->pData);
taosMemoryFree(pInfo->pPrevGroupKey);
cleanupExprSupp(&pInfo->scalarSup);
for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
......
......@@ -1523,9 +1523,7 @@ static int32_t translateInterpFunc(STranslateContext* pCxt, SFunctionNode* pFunc
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SNode* pTable = pSelect->pFromTable;
if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
(TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType)))) {
if ((NULL != pTable && QUERY_NODE_REAL_TABLE != nodeType(pTable))) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName);
}
......
......@@ -23,6 +23,8 @@ class TDTestCase:
stbname = "stb"
ctbname1 = "ctb1"
ctbname2 = "ctb2"
ctbname3 = "ctb3"
num_of_ctables = 3
tdSql.prepare()
......@@ -816,17 +818,26 @@ class TDTestCase:
)
tdSql.execute(
f'''create table if not exists {dbname}.{ctbname2} using {dbname}.{stbname} tags(1)
f'''create table if not exists {dbname}.{ctbname2} using {dbname}.{stbname} tags(2)
'''
)
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:10', 10, 10, 10, 10, 10.0, 10.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
tdSql.execute(
f'''create table if not exists {dbname}.{ctbname3} using {dbname}.{stbname} tags(3)
'''
)
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:01', 1, 1, 1, 1, 1.0, 1.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:07', 7, 7, 7, 7, 7.0, 7.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:13', 13, 13, 13, 13, 13.0, 13.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:10', 10, 10, 10, 10, 10.0, 10.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-01 00:00:03', 3, 3, 3, 3, 3.0, 3.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-01 00:00:09', 9, 9, 9, 9, 9.0, 9.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname3} values ('2020-02-01 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname3} values ('2020-02-01 00:00:11', 11, 11, 11, 11, 11.0, 11.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname3} values ('2020-02-01 00:00:17', 17, 17, 17, 17, 17.0, 17.0, true, 'varchar', 'nchar')")
tdSql.execute(f"flush database {dbname}");
......@@ -834,7 +845,7 @@ class TDTestCase:
# test fill null
## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(null)")
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(null)")
tdSql.checkRows(11)
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, None)
......@@ -881,7 +892,7 @@ class TDTestCase:
# test fill value
## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(value, 1)")
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(value, 1)")
tdSql.checkRows(11)
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 1)
......@@ -895,7 +906,7 @@ class TDTestCase:
tdSql.checkData(9, 0, 1)
tdSql.checkData(10, 0, 15)
## | . | {} | . |
# | . | {} | . |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-03 00:00:05', '2020-02-07 00:00:05') every(1d) fill(value, 1)")
tdSql.checkRows(5)
tdSql.checkData(0, 0, 1)
......@@ -928,7 +939,7 @@ class TDTestCase:
# test fill prev
## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(prev)")
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(prev)")
tdSql.checkRows(11)
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 5)
......@@ -973,7 +984,7 @@ class TDTestCase:
# test fill next
## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(next)")
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(next)")
tdSql.checkRows(11)
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 15)
......@@ -1015,7 +1026,7 @@ class TDTestCase:
# test fill linear
## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(linear)")
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(linear)")
tdSql.checkRows(11)
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 6)
......@@ -2391,19 +2402,516 @@ class TDTestCase:
tdLog.printNoPrefix("==========step13:stable cases")
tdLog.printNoPrefix("==========step13:test stable cases")
# select interp from supertable
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdSql.checkRows(19)
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 2, 1)
tdSql.checkData(2, 2, None)
tdSql.checkData(3, 2, 3)
tdSql.checkData(4, 2, None)
tdSql.checkData(5, 2, 5)
tdSql.checkData(6, 2, None)
tdSql.checkData(7, 2, 7)
tdSql.checkData(8, 2, None)
tdSql.checkData(9, 2, 9)
tdSql.checkData(10, 2, None)
tdSql.checkData(11, 2, 11)
tdSql.checkData(12, 2, None)
tdSql.checkData(13, 2, 13)
tdSql.checkData(14, 2, None)
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, None)
tdSql.checkData(17, 2, 17)
tdSql.checkData(18, 2, None)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)")
tdSql.checkRows(19)
tdSql.checkData(0, 2, 0)
tdSql.checkData(1, 2, 1)
tdSql.checkData(2, 2, 0)
tdSql.checkData(3, 2, 3)
tdSql.checkData(4, 2, 0)
tdSql.checkData(5, 2, 5)
tdSql.checkData(6, 2, 0)
tdSql.checkData(7, 2, 7)
tdSql.checkData(8, 2, 0)
tdSql.checkData(9, 2, 9)
tdSql.checkData(10, 2, 0)
tdSql.checkData(11, 2, 11)
tdSql.checkData(12, 2, 0)
tdSql.checkData(13, 2, 13)
tdSql.checkData(14, 2, 0)
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, 0)
tdSql.checkData(17, 2, 17)
tdSql.checkData(18, 2, 0)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)")
tdSql.checkRows(18)
tdSql.checkData(0, 0, '2020-02-01 00:00:01.000')
tdSql.checkData(0, 1, False)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 1)
tdSql.checkData(2, 2, 3)
tdSql.checkData(3, 2, 3)
tdSql.checkData(4, 2, 5)
tdSql.checkData(5, 2, 5)
tdSql.checkData(6, 2, 7)
tdSql.checkData(7, 2, 7)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, 9)
tdSql.checkData(10, 2, 11)
tdSql.checkData(11, 2, 11)
tdSql.checkData(12, 2, 13)
tdSql.checkData(13, 2, 13)
tdSql.checkData(14, 2, 15)
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, 17)
tdSql.checkData(17, 2, 17)
tdSql.checkData(17, 0, '2020-02-01 00:00:18.000')
tdSql.checkData(17, 1, True)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(18)
tdSql.checkData(0, 0, '2020-02-01 00:00:00.000')
tdSql.checkData(0, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 1)
tdSql.checkData(2, 2, 3)
tdSql.checkData(3, 2, 3)
tdSql.checkData(4, 2, 5)
tdSql.checkData(5, 2, 5)
tdSql.checkData(6, 2, 7)
tdSql.checkData(7, 2, 7)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, 9)
tdSql.checkData(10, 2, 11)
tdSql.checkData(11, 2, 11)
tdSql.checkData(12, 2, 13)
tdSql.checkData(13, 2, 13)
tdSql.checkData(14, 2, 15)
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, 17)
tdSql.checkData(17, 2, 17)
tdSql.checkData(17, 0, '2020-02-01 00:00:17.000')
tdSql.checkData(17, 1, False)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 2)
tdSql.checkData(2, 2, 3)
tdSql.checkData(3, 2, 4)
tdSql.checkData(4, 2, 5)
tdSql.checkData(5, 2, 6)
tdSql.checkData(6, 2, 7)
tdSql.checkData(7, 2, 8)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, 10)
tdSql.checkData(10, 2, 11)
tdSql.checkData(11, 2, 12)
tdSql.checkData(12, 2, 13)
tdSql.checkData(13, 2, 14)
tdSql.checkData(14, 2, 15)
tdSql.checkData(15, 2, 16)
tdSql.checkData(16, 2, 17)
# select interp from supertable partition by tbname
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
point_idx = {1, 7, 13, 22, 28, 34, 43, 49, 55}
point_dict = {1:1, 7:7, 13:13, 22:3, 28:9, 34:15, 43:5, 49:11, 55:17}
rows_per_partition = 19
tdSql.checkRows(rows_per_partition * num_of_ctables)
for i in range(num_of_ctables):
for j in range(rows_per_partition):
row = j + i * rows_per_partition
tdSql.checkData(row, 0, f'ctb{i + 1}')
tdSql.checkData(j, 1, f'2020-02-01 00:00:{j}.000')
if row in point_idx:
tdSql.checkData(row, 2, False)
else:
tdSql.checkData(row, 2, True)
if row in point_idx:
tdSql.checkData(row, 3, point_dict[row])
else:
tdSql.checkData(row, 3, None)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)")
point_idx = {1, 7, 13, 22, 28, 34, 43, 49, 55}
point_dict = {1:1, 7:7, 13:13, 22:3, 28:9, 34:15, 43:5, 49:11, 55:17}
rows_per_partition = 19
tdSql.checkRows(rows_per_partition * num_of_ctables)
for i in range(num_of_ctables):
for j in range(rows_per_partition):
row = j + i * rows_per_partition
tdSql.checkData(row, 0, f'ctb{i + 1}')
tdSql.checkData(j, 1, f'2020-02-01 00:00:{j}.000')
if row in point_idx:
tdSql.checkData(row, 2, False)
else:
tdSql.checkData(row, 2, True)
if row in point_idx:
tdSql.checkData(row, 3, point_dict[row])
else:
tdSql.checkData(row, 3, 0)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)")
tdSql.checkRows(48)
for i in range(0, 18):
tdSql.checkData(i, 0, 'ctb1')
for i in range(18, 34):
tdSql.checkData(i, 0, 'ctb2')
for i in range(34, 48):
tdSql.checkData(i, 0, 'ctb3')
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(17, 1, '2020-02-01 00:00:18.000')
tdSql.checkData(18, 1, '2020-02-01 00:00:03.000')
tdSql.checkData(33, 1, '2020-02-01 00:00:18.000')
tdSql.checkData(34, 1, '2020-02-01 00:00:05.000')
tdSql.checkData(47, 1, '2020-02-01 00:00:18.000')
for i in range(0, 6):
tdSql.checkData(i, 3, 1)
for i in range(6, 12):
tdSql.checkData(i, 3, 7)
for i in range(12, 18):
tdSql.checkData(i, 3, 13)
for i in range(18, 24):
tdSql.checkData(i, 3, 3)
for i in range(24, 30):
tdSql.checkData(i, 3, 9)
for i in range(30, 34):
tdSql.checkData(i, 3, 15)
for i in range(34, 40):
tdSql.checkData(i, 3, 5)
for i in range(40, 46):
tdSql.checkData(i, 3, 11)
for i in range(46, 48):
tdSql.checkData(i, 3, 17)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(48)
for i in range(0, 14):
tdSql.checkData(i, 0, 'ctb1')
for i in range(14, 30):
tdSql.checkData(i, 0, 'ctb2')
for i in range(30, 48):
tdSql.checkData(i, 0, 'ctb3')
tdSql.checkData(0, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(13, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(14, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(29, 1, '2020-02-01 00:00:15.000')
tdSql.checkData(30, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(47, 1, '2020-02-01 00:00:17.000')
for i in range(0, 2):
tdSql.checkData(i, 3, 1)
for i in range(2, 8):
tdSql.checkData(i, 3, 7)
for i in range(8, 14):
tdSql.checkData(i, 3, 13)
for i in range(14, 18):
tdSql.checkData(i, 3, 3)
for i in range(18, 24):
tdSql.checkData(i, 3, 9)
for i in range(24, 30):
tdSql.checkData(i, 3, 15)
for i in range(30, 36):
tdSql.checkData(i, 3, 5)
for i in range(36, 42):
tdSql.checkData(i, 3, 11)
for i in range(42, 48):
tdSql.checkData(i, 3, 17)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(39)
for i in range(0, 13):
tdSql.checkData(i, 0, 'ctb1')
for i in range(13, 26):
tdSql.checkData(i, 0, 'ctb2')
for i in range(26, 39):
tdSql.checkData(i, 0, 'ctb3')
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(12, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(13, 1, '2020-02-01 00:00:03.000')
tdSql.checkData(25, 1, '2020-02-01 00:00:15.000')
tdSql.checkData(26, 1, '2020-02-01 00:00:05.000')
tdSql.checkData(38, 1, '2020-02-01 00:00:17.000')
for i in range(0, 13):
tdSql.checkData(i, 3, i + 1)
for i in range(13, 26):
tdSql.checkData(i, 3, i - 10)
for i in range(26, 39):
tdSql.checkData(i, 3, i - 21)
# select interp from supertable partition by column
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdSql.checkRows(171)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)")
tdSql.checkRows(171)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)")
tdSql.checkRows(90)
tdSql.error(f"select interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(null)")
#tdSql.checkRows(13)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(90)
#tdSql.query(f"select interp(c0) from {dbname}.{ctbname1} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(null)")
#tdSql.checkRows(13)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(9)
# select interp from supertable partition by tag
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdSql.checkRows(57)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)")
tdSql.checkRows(57)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)")
tdSql.checkRows(48)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(48)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(39)
# select interp from supertable filter
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(27)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(27)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
# select interp from supertable filter limit
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 13")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 20")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 13")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 40")
tdSql.checkRows(39)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
# select interp from supertable with scalar expression
tdSql.query(f"select _irowts, _isfilled, interp(1 + 1) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, 2.0)
tdSql.query(f"select _irowts, _isfilled, interp(c0 + 1) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 2)
tdSql.query(f"select _irowts, _isfilled, interp(c0 * 2) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, (i + 1) * 2)
tdSql.query(f"select _irowts, _isfilled, interp(c0 + c1) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, (i + 1) * 2)
# check duplicate timestamp
# add duplicate timestamp for different child tables
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
tdSql.error(f"select interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1s) fill(null)")
#tdSql.checkRows(13)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:14') every(1s) fill(null)")
tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:15') every(1s) fill(null)")
tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
#tdSql.query(f"select _irowts,interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1h) fill(prev)")
#tdSql.query(f"select tbname,_irowts,interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1h) fill(prev)")
tdLog.printNoPrefix("======step 14: test interp pseudo columns")
tdSql.error(f"select _irowts, c6 from {dbname}.{tbname}")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册