Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ea493034
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ea493034
编写于
7月 18, 2022
作者:
W
wenzhouwww@live.cn
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
上级
b79845dd
04b12ee7
变更
20
展开全部
隐藏空白更改
内联
并排
Showing
20 changed file
with
510 addition
and
412 deletion
+510
-412
docs/zh/12-taos-sql/12-interval.md
docs/zh/12-taos-sql/12-interval.md
+21
-15
docs/zh/12-taos-sql/14-stream.md
docs/zh/12-taos-sql/14-stream.md
+0
-0
docs/zh/12-taos-sql/16-operators.md
docs/zh/12-taos-sql/16-operators.md
+0
-0
docs/zh/12-taos-sql/17-json.md
docs/zh/12-taos-sql/17-json.md
+0
-0
docs/zh/12-taos-sql/19-limit.md
docs/zh/12-taos-sql/19-limit.md
+0
-0
docs/zh/12-taos-sql/22-information.md
docs/zh/12-taos-sql/22-information.md
+5
-0
include/common/tcommon.h
include/common/tcommon.h
+0
-2
include/libs/scalar/scalar.h
include/libs/scalar/scalar.h
+2
-0
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+4
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+9
-17
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+16
-11
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+18
-3
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+14
-17
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+23
-34
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+2
-0
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+1
-1
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+29
-0
tests/system-test/2-query/last_row.py
tests/system-test/2-query/last_row.py
+362
-309
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+3
-1
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
docs/zh/12-taos-sql/12-interval.md
浏览文件 @
ea493034
...
...
@@ -3,9 +3,9 @@ sidebar_label: 时序数据特色查询
title
:
时序数据特色查询
---
TDengine
是专为时序数据而研发的大数据平台,存储和计算都针对时序数据的特定进行了量身定制,在支持标准SQL
的基础之上,还提供了一系列贴合时序业务场景的特色查询语法,极大的方便时序场景的应用开发。
TDengine
是专为时序数据而研发的大数据平台,存储和计算都针对时序数据的特定进行了量身定制,在支持标准 SQL
的基础之上,还提供了一系列贴合时序业务场景的特色查询语法,极大的方便时序场景的应用开发。
TDengine提供的特色查询包括标签切分查询和窗口切分查询。
TDengine
提供的特色查询包括标签切分查询和窗口切分查询。
## 标签切分查询
...
...
@@ -14,13 +14,14 @@ TDengine提供的特色查询包括标签切分查询和窗口切分查询。
```
sql
PARTITION
BY
tag_list
```
其中
`tag_list`
是标签列的列表,还可以包括tbname伪列。
TDengine按如下方式处理标签切分子句:
其中
`tag_list`
是标签列的列表,还可以包括 tbname 伪列。
TDengine 按如下方式处理标签切分子句:
标签切分子句位于
`WHERE`
子句之后,且不能和
`JOIN`
子句一起使用。
标签切分子句将超级表数据按指定的标签组合进行切分,然后对每个切分的分片进行指定的计算。计算由之后的子句定义(窗口子句、
`GROUP BY`
子句或
`SELECT`
子句)。
标签切分子句可以和窗口切分子句(或
`GROUP BY`
子句)一起使用,此时后面的子句作用在每个切分的分片上。例如,下面的示例将数据按标签
`location`
进行分组,并对每个组按
10
分钟进行降采样,取其最大值。
标签切分子句可以和窗口切分子句(或
`GROUP BY`
子句)一起使用,此时后面的子句作用在每个切分的分片上。例如,下面的示例将数据按标签
`location`
进行分组,并对每个组按
10
分钟进行降采样,取其最大值。
```
sql
select
max
(
current
)
from
meters
partition
by
location
interval
(
10
m
)
...
...
@@ -42,27 +43,31 @@ SELECT function_list FROM tb_name
在上述语法中的具体限制如下
### 窗口切分查询中使用函数的限制
-
在聚合查询中,function_list 位置允许使用聚合和选择函数,并要求每个函数仅输出单个结果(例如:COUNT、AVG、SUM、STDDEV、LEASTSQUARES、PERCENTILE、MIN、MAX、FIRST、LAST),而不能使用具有多行输出结果的函数(例如:DIFF 以及四则运算)。
-
此外 LAST_ROW 查询也不能与窗口聚合同时出现。
-
标量函数(如:CEIL/FLOOR 等)也不能使用在窗口聚合查询中。
### 窗口子句的规则
-
窗口子句位于标签切分子句之后,GROUP BY子句之前,且不可以和GROUP BY子句一起使用。
-
窗口子句将数据按窗口进行切分,对每个窗口进行SELECT列表中的表达式的计算,SELECT列表中的表达式只能包含:
-
窗口子句位于标签切分子句之后,GROUP BY 子句之前,且不可以和 GROUP BY 子句一起使用。
-
窗口子句将数据按窗口进行切分,对每个窗口进行 SELECT 列表中的表达式的计算,SELECT 列表中的表达式只能包含:
-
常量。
-
聚集函数。
-
包含上面表达式的表达式。
-
窗口子句不可以和
GROUP BY
子句一起使用。
-
窗口子句不可以和
GROUP BY
子句一起使用。
-
WHERE 语句可以指定查询的起止时间和其他过滤条件。
### FILL 子句
FILL 语句指定某一窗口区间数据缺失的情况下的填充模式。填充模式包括以下几种:
1.
不进行填充:NONE(默认填充模式)。
2.
VALUE 填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。这里需要注意,最终填充的值受由相应列的类型决定,如FILL(VALUE, 1.23),相应列为INT类型,则填充值为1。
3.
PREV 填充:使用前一个非 NULL 值填充数据。例如:FILL(PREV)。
4.
NULL 填充:使用 NULL 填充数据。例如:FILL(NULL)。
5.
LINEAR 填充:根据前后距离最近的非 NULL 值做线性插值填充。例如:FILL(LINEAR)。
6.
NEXT 填充:使用下一个非 NULL 值填充数据。例如:FILL(NEXT)。
FILL 语句指定某一窗口区间数据缺失的情况下的填充模式。填充模式包括以下几种:
1.
不进行填充:NONE(默认填充模式)。
2.
VALUE 填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。这里需要注意,最终填充的值受由相应列的类型决定,如 FILL(VALUE, 1.23),相应列为 INT 类型,则填充值为 1。
3.
PREV 填充:使用前一个非 NULL 值填充数据。例如:FILL(PREV)。
4.
NULL 填充:使用 NULL 填充数据。例如:FILL(NULL)。
5.
LINEAR 填充:根据前后距离最近的非 NULL 值做线性插值填充。例如:FILL(LINEAR)。
6.
NEXT 填充:使用下一个非 NULL 值填充数据。例如:FILL(NEXT)。
:::info
...
...
@@ -93,6 +98,7 @@ SELECT COUNT(*) FROM temp_tb_1 INTERVAL(1m) SLIDING(2m);
```
使用时间窗口需要注意:
-
聚合时间段的窗口宽度由关键词 INTERVAL 指定,最短时间间隔 10 毫秒(10a);并且支持偏移 offset(偏移必须小于间隔),也即时间窗口划分与“UTC 时刻 0”相比的偏移量。SLIDING 语句用于指定聚合时间段的前向增量,也即每次窗口向前滑动的时长。
-
使用 INTERVAL 语句时,除非极特殊的情况,都要求把客户端和服务端的 taos.cfg 配置文件中的 timezone 参数配置为相同的取值,以避免时间处理函数频繁进行跨时区转换而导致的严重性能影响。
-
返回的结果中时间序列严格单调递增。
...
...
docs/zh/12-taos-sql/14-stream.md
0 → 100644
浏览文件 @
ea493034
docs/zh/12-taos-sql/1
3
-operators.md
→
docs/zh/12-taos-sql/1
6
-operators.md
浏览文件 @
ea493034
文件已移动
docs/zh/12-taos-sql/1
6
-json.md
→
docs/zh/12-taos-sql/1
7
-json.md
浏览文件 @
ea493034
文件已移动
docs/zh/12-taos-sql/1
4
-limit.md
→
docs/zh/12-taos-sql/1
9
-limit.md
浏览文件 @
ea493034
文件已移动
docs/zh/12-taos-sql/22-information.md
0 → 100644
浏览文件 @
ea493034
---
sidebar_label
:
Information内置数据库
title
:
Information内置数据库
---
include/common/tcommon.h
浏览文件 @
ea493034
...
...
@@ -80,8 +80,6 @@ typedef struct {
SArray
*
pTableList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
bool
needSortTableByGroupId
;
void
*
pTagCond
;
void
*
pTagIndexCond
;
uint64_t
suid
;
}
STableListInfo
;
...
...
include/libs/scalar/scalar.h
浏览文件 @
ea493034
...
...
@@ -117,6 +117,8 @@ int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
int32_t
stateCountScalarFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
);
int32_t
stateDurationScalarFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
);
int32_t
histogramScalarFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
);
int32_t
topScalarFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
);
int32_t
bottomScalarFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
);
#ifdef __cplusplus
}
...
...
source/libs/executor/inc/executil.h
浏览文件 @
ea493034
...
...
@@ -15,6 +15,7 @@
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
#include "vnode.h"
#include "function.h"
#include "nodes.h"
#include "plannodes.h"
...
...
@@ -106,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock
*
createResDataBlock
(
SDataBlockDescNode
*
pNode
);
EDealRes
doTranslateTagExpr
(
SNode
**
pNode
,
void
*
pContext
);
int32_t
getTableList
(
void
*
metaHandle
,
void
*
vnode
,
SScanPhysiNode
*
pScanNode
,
STableListInfo
*
pListInfo
);
int32_t
getTableList
(
void
*
metaHandle
,
void
*
pVnode
,
SScanPhysiNode
*
pScanNode
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
STableListInfo
*
pListInfo
);
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
...
...
@@ -128,4 +129,6 @@ int32_t convertFillType(int32_t mode);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
isTableOk
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorimpl.h
浏览文件 @
ea493034
...
...
@@ -311,19 +311,10 @@ typedef struct STableScanInfo {
int32_t
dataBlockLoadFlag
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SSampleExecInfo
sample
;
// sample execution info
int32_t
currentGroupId
;
int32_t
currentTable
;
#if 0
struct {
uint64_t uid;
int64_t ts;
} lastStatus;
#endif
int8_t
scanMode
;
int8_t
noTable
;
int8_t
scanMode
;
int8_t
noTable
;
}
STableScanInfo
;
typedef
struct
STagScanInfo
{
...
...
@@ -429,8 +420,9 @@ typedef struct SStreamScanInfo {
// status for tmq
// SSchemaWrapper schema;
STqOffset
offset
;
STqOffset
offset
;
SNode
*
pTagCond
;
SNode
*
pTagIndexCond
;
}
SStreamScanInfo
;
typedef
struct
SSysTableScanInfo
{
...
...
@@ -874,8 +866,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SOperatorInfo
*
createDataBlockInfoScanOperator
(
void
*
dataReader
,
SReadHandle
*
readHandle
,
uint64_t
uid
,
SBlockDistScanPhysiNode
*
pBlockScanNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SExecTaskInfo
*
pTaskInfo
,
STimeWindowAggSupp
*
pTwSup
,
uint64_t
queryId
,
uint64_t
taskId
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SNode
*
pTagCond
,
SExecTaskInfo
*
pTaskInfo
,
STimeWindowAggSupp
*
pTwSup
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
);
...
...
@@ -966,12 +958,12 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
);
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
idstr
);
SOperatorInfo
*
createGroupSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SGroupSortPhysiNode
*
pSortPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
STableListInfo
*
pTableListInfo
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
,
uint64_t
queryId
,
uint64_t
taskId
);
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
);
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
ea493034
...
...
@@ -265,7 +265,7 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
static
bool
isTableOk
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
SMeta
*
metaHandle
)
{
int32_t
isTableOk
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
)
{
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
metaHandle
,
0
);
metaGetTableEntryByUid
(
&
mr
,
info
->
uid
);
...
...
@@ -280,19 +280,22 @@ static bool isTableOk(STableKeyInfo* info, SNode* pTagCond, SMeta* metaHandle) {
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
terrno
=
code
;
nodesDestroyNode
(
pTagCondTmp
);
return
false
;
*
pQualified
=
false
;
return
code
;
}
ASSERT
(
nodeType
(
pNew
)
==
QUERY_NODE_VALUE
);
SValueNode
*
pValue
=
(
SValueNode
*
)
pNew
;
ASSERT
(
pValue
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_BOOL
);
bool
result
=
pValue
->
datum
.
b
;
*
pQualified
=
pValue
->
datum
.
b
;
nodesDestroyNode
(
pNew
);
return
result
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
getTableList
(
void
*
metaHandle
,
void
*
pVnode
,
SScanPhysiNode
*
pScanNode
,
STableListInfo
*
pListInfo
)
{
int32_t
getTableList
(
void
*
metaHandle
,
void
*
pVnode
,
SScanPhysiNode
*
pScanNode
,
S
Node
*
pTagCond
,
SNode
*
pTagIndexCond
,
S
TableListInfo
*
pListInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
pListInfo
->
pTableList
=
taosArrayInit
(
8
,
sizeof
(
STableKeyInfo
));
...
...
@@ -304,8 +307,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
pListInfo
->
suid
=
pScanNode
->
suid
;
SNode
*
pTagCond
=
(
SNode
*
)
pListInfo
->
pTagCond
;
SNode
*
pTagIndexCond
=
(
SNode
*
)
pListInfo
->
pTagIndexCond
;
if
(
pScanNode
->
tableType
==
TSDB_SUPER_TABLE
)
{
if
(
pTagIndexCond
)
{
SIndexMetaArg
metaArg
=
{
...
...
@@ -345,9 +346,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
int32_t
i
=
0
;
while
(
i
<
taosArrayGetSize
(
pListInfo
->
pTableList
))
{
STableKeyInfo
*
info
=
taosArrayGet
(
pListInfo
->
pTableList
,
i
);
bool
isOk
=
isTableOk
(
info
,
pTagCond
,
metaHandle
);
if
(
terrno
)
return
terrno
;
if
(
!
isOk
)
{
bool
qualified
=
true
;
code
=
isTableOk
(
info
,
pTagCond
,
metaHandle
,
&
qualified
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
!
qualified
)
{
taosArrayRemove
(
pListInfo
->
pTableList
,
i
);
continue
;
}
...
...
@@ -362,7 +368,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
// put into list as default group, remove it if grouping sorting is required later
taosArrayPush
(
pListInfo
->
pGroupList
,
&
pListInfo
->
pTableList
);
return
code
;
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
ea493034
...
...
@@ -153,7 +153,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
return
pTaskInfo
;
}
static
SArray
*
filterQualifiedChildTables
(
const
SStreamScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
)
{
static
SArray
*
filterQualifiedChildTables
(
const
SStreamScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
,
const
char
*
idstr
)
{
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
// let's discard the tables those are not created according to the queried super table.
...
...
@@ -164,7 +164,7 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s
"
,
*
id
,
tstrerror
(
terrno
)
);
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s
, %s"
,
*
id
,
tstrerror
(
terrno
),
idstr
);
continue
;
}
...
...
@@ -172,6 +172,21 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
if
(
mr
.
me
.
type
!=
TSDB_CHILD_TABLE
||
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
if
(
pScanInfo
->
pTagCond
!=
NULL
)
{
bool
qualified
=
false
;
STableKeyInfo
info
=
{.
groupId
=
0
,
.
uid
=
mr
.
me
.
uid
,
.
lastKey
=
0
};
code
=
isTableOk
(
&
info
,
pScanInfo
->
pTagCond
,
pScanInfo
->
readHandle
.
meta
,
&
qualified
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to filter new table, uid:0x%"
PRIx64
", %s"
,
info
.
uid
,
idstr
);
continue
;
}
if
(
!
qualified
)
{
continue
;
}
}
/*pScanInfo->pStreamScanOp->pTaskInfo->tableqinfoList.*/
// handle multiple partition
...
...
@@ -194,7 +209,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
int32_t
code
=
0
;
SStreamScanInfo
*
pScanInfo
=
pInfo
->
info
;
if
(
isAdd
)
{
// add new table id
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
,
GET_TASKID
(
pTaskInfo
)
);
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
code
=
tqReaderAddTbUidList
(
pScanInfo
->
tqReader
,
qa
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
ea493034
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
...
...
@@ -4369,8 +4368,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
}
SOperatorInfo
*
createOperatorTree
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
queryId
,
uint64_t
taskId
,
STableListInfo
*
pTableListInfo
,
const
char
*
pUser
)
{
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
pUser
)
{
int32_t
type
=
nodeType
(
pPhyNode
);
if
(
pPhyNode
->
pChildren
==
NULL
||
LIST_LENGTH
(
pPhyNode
->
pChildren
)
==
0
)
{
...
...
@@ -4378,7 +4376,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanPhysiNode
*
pTableScanNode
=
(
STableScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
);
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
GET_TASKID
(
pTaskInfo
)
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -4398,7 +4396,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
==
type
)
{
STableMergeScanPhysiNode
*
pTableScanNode
=
(
STableMergeScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
);
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
GET_TASKID
(
pTaskInfo
)
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -4411,7 +4409,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
SOperatorInfo
*
pOperator
=
createTableMergeScanOperatorInfo
(
pTableScanNode
,
pTableListInfo
,
pHandle
,
pTaskInfo
,
queryId
,
taskId
);
createTableMergeScanOperatorInfo
(
pTableScanNode
,
pTableListInfo
,
pHandle
,
pTaskInfo
);
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
...
...
@@ -4428,15 +4426,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
};
if
(
pHandle
)
{
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
);
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
GET_TASKID
(
pTaskInfo
)
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
}
}
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
,
pTableScanNode
,
pTaskInfo
,
&
twSup
,
queryId
,
taskId
);
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
,
pTableScanNode
,
pTagCond
,
pTaskInfo
,
&
twSup
);
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
==
type
)
{
...
...
@@ -4445,7 +4442,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
==
type
)
{
STagScanPhysiNode
*
pScanPhyNode
=
(
STagScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanPhyNode
,
pTableListInfo
);
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanPhyNode
,
pTa
gCond
,
pTagIndexCond
,
pTa
bleListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
terrno
;
return
NULL
;
...
...
@@ -4481,8 +4478,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN
==
type
)
{
SLastRowScanPhysiNode
*
pScanNode
=
(
SLastRowScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
createScanTableListInfo
(
&
pScanNode
->
scan
,
pScanNode
->
pGroupTags
,
true
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
);
int32_t
code
=
createScanTableListInfo
(
&
pScanNode
->
scan
,
pScanNode
->
pGroupTags
,
true
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -4506,7 +4502,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo
**
ops
=
taosMemoryCalloc
(
size
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
i
);
ops
[
i
]
=
createOperatorTree
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableListInfo
,
pUser
);
ops
[
i
]
=
createOperatorTree
(
pChildNode
,
pTaskInfo
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pUser
);
if
(
ops
[
i
]
==
NULL
)
{
return
NULL
;
}
...
...
@@ -4689,6 +4685,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return
pList
;
}
#if 0
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, const char* idstr) {
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
...
...
@@ -4722,6 +4719,7 @@ _error:
terrno = code;
return NULL;
}
#endif
static
int32_t
extractTbscanInStreamOpTree
(
SOperatorInfo
*
pOperator
,
STableScanInfo
**
ppInfo
)
{
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
...
...
@@ -4765,6 +4763,7 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
return
-
1
;
}
#if 0
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
STableScanInfo* pTableScanInfo = NULL;
if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
...
...
@@ -4788,6 +4787,7 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa
// TODO: set uid and ts to data reader
return 0;
}
#endif
int32_t
encodeOperator
(
SOperatorInfo
*
ops
,
char
**
result
,
int32_t
*
length
,
int32_t
*
nOptrWithVal
)
{
int32_t
code
=
TDB_CODE_SUCCESS
;
...
...
@@ -4939,10 +4939,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
(
*
pTaskInfo
)
->
sql
=
sql
;
(
*
pTaskInfo
)
->
tableqinfoList
.
pTagCond
=
pPlan
->
pTagCond
;
(
*
pTaskInfo
)
->
tableqinfoList
.
pTagIndexCond
=
pPlan
->
pTagIndexCond
;
(
*
pTaskInfo
)
->
pRoot
=
createOperatorTree
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
&
(
*
pTaskInfo
)
->
tableqinfoList
,
pPlan
->
user
);
(
*
pTaskInfo
)
->
pRoot
=
createOperatorTree
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
&
(
*
pTaskInfo
)
->
tableqinfoList
,
pPlan
->
pTagCond
,
pPlan
->
pTagIndexCond
,
pPlan
->
user
);
if
(
NULL
==
(
*
pTaskInfo
)
->
pRoot
)
{
code
=
(
*
pTaskInfo
)
->
code
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
ea493034
...
...
@@ -1497,9 +1497,8 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree
(
pStreamScan
);
}
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SExecTaskInfo
*
pTaskInfo
,
STimeWindowAggSupp
*
pTwSup
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SNode
*
pTagCond
,
SExecTaskInfo
*
pTaskInfo
,
STimeWindowAggSupp
*
pTwSup
)
{
SStreamScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -1512,6 +1511,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SDataBlockDescNode
*
pDescNode
=
pScanPhyNode
->
node
.
pOutputDataBlockDesc
;
pInfo
->
pTagCond
=
pTagCond
;
int32_t
numOfCols
=
0
;
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
...
...
@@ -2550,26 +2551,19 @@ typedef struct STableMergeScanInfo {
int32_t
tableEndIndex
;
bool
hasGroupId
;
uint64_t
groupId
;
SArray
*
dataReaders
;
// array of tsdbReaderT*
SReadHandle
readHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SArray
*
dataReaders
;
// array of tsdbReaderT*
SReadHandle
readHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
uint64_t
queryId
;
uint64_t
taskId
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
// int32_t prevGroupId; // previous table group id
int64_t
numOfRows
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
...
...
@@ -2584,26 +2578,25 @@ typedef struct STableMergeScanInfo {
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SqlFunctionCtx
*
pPseudoCtx
;
// int32_t* rowEntryInfoOffset;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval
interval
;
SSampleExecInfo
sample
;
// sample execution info
}
STableMergeScanInfo
;
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
)
{
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanNode
,
pTableListInfo
);
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
idStr
)
{
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanNode
,
pTa
gCond
,
pTagIndexCond
,
pTa
bleListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
taosArrayGetSize
(
pTableListInfo
->
pTableList
)
==
0
)
{
qDebug
(
"no table qualified for query,
TID:0x%"
PRIx64
", QID:0x%"
PRIx64
,
taskId
,
queryId
);
qDebug
(
"no table qualified for query,
%s"
PRIx64
,
idStr
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3027,8 +3020,7 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
}
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
STableListInfo
*
pTableListInfo
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
STableMergeScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableMergeScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -3067,9 +3059,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo
->
pResBlock
=
createResDataBlock
(
pDescNode
);
pInfo
->
dataReaders
=
taosArrayInit
(
64
,
POINTER_BYTES
);
pInfo
->
queryId
=
queryId
;
pInfo
->
taskId
=
taskId
;
pInfo
->
sortSourceParams
=
taosArrayInit
(
64
,
sizeof
(
STableMergeScanSortSourceParam
));
pInfo
->
pSortInfo
=
generateSortByTsInfo
(
pInfo
->
pColMatchInfo
,
pInfo
->
cond
.
order
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
ea493034
...
...
@@ -2086,6 +2086,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
topBotFunctionSetup
,
.
processFunc
=
topFunction
,
.
sprocessFunc
=
topScalarFunction
,
.
finalizeFunc
=
topBotFinalize
,
.
combineFunc
=
topCombine
,
.
pPartialFunc
=
"top"
,
...
...
@@ -2100,6 +2101,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
topBotFunctionSetup
,
.
processFunc
=
bottomFunction
,
.
sprocessFunc
=
bottomScalarFunction
,
.
finalizeFunc
=
topBotFinalize
,
.
combineFunc
=
bottomCombine
,
.
pPartialFunc
=
"bottom"
,
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
ea493034
...
...
@@ -255,7 +255,7 @@ static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlo
static
int32_t
addDataBlockSlotsForProject
(
SPhysiPlanContext
*
pCxt
,
const
char
*
pStmtName
,
SNodeList
*
pList
,
SDataBlockDescNode
*
pDataBlockDesc
)
{
return
addDataBlockSlotsImpl
(
pCxt
,
pList
,
pDataBlockDesc
,
pStmtName
,
tru
e
,
false
);
return
addDataBlockSlotsImpl
(
pCxt
,
pList
,
pDataBlockDesc
,
pStmtName
,
fals
e
,
false
);
}
static
int32_t
pushdownDataBlockSlots
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pList
,
SDataBlockDescNode
*
pDataBlockDesc
)
{
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
ea493034
...
...
@@ -2816,3 +2816,32 @@ int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
pOutput
->
numOfRows
=
numOfBins
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
selectScalarFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
SColumnInfoData
*
pInputData
=
pInput
->
columnData
;
SColumnInfoData
*
pOutputData
=
pOutput
->
columnData
;
int32_t
type
=
GET_PARAM_TYPE
(
pInput
);
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInputData
,
i
))
{
colDataAppendNULL
(
pOutputData
,
0
);
continue
;
}
char
*
data
=
colDataGetData
(
pInputData
,
i
);
colDataAppend
(
pOutputData
,
i
,
data
,
false
);
}
pOutput
->
numOfRows
=
1
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
topScalarFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
return
selectScalarFunction
(
pInput
,
inputNum
,
pOutput
);
}
int32_t
bottomScalarFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
return
selectScalarFunction
(
pInput
,
inputNum
,
pOutput
);
}
tests/system-test/2-query/last_row.py
浏览文件 @
ea493034
此差异已折叠。
点击以展开。
tests/system-test/fulltest.sh
浏览文件 @
ea493034
...
...
@@ -126,7 +126,7 @@ python3 ./test.py -f 2-query/count_partition.py
python3 ./test.py
-f
2-query/function_null.py
python3 ./test.py
-f
2-query/queryQnode.py
python3 ./test.py
-f
2-query/max_partition.py
python3 ./test.py
-f
2-query/last_row.py
python3 ./test.py
-f
6-cluster/5dnode1mnode.py
#BUG python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
...
...
@@ -274,6 +274,7 @@ python3 ./test.py -f 2-query/irate.py -Q 2
python3 ./test.py
-f
2-query/function_null.py
-Q
2
python3 ./test.py
-f
2-query/count_partition.py
-Q
2
python3 ./test.py
-f
2-query/max_partition.py
-Q
2
python3 ./test.py
-f
2-query/last_row.py
-Q
2
#------------querPolicy 3-----------
...
...
@@ -361,3 +362,4 @@ python3 ./test.py -f 2-query/irate.py -Q 3
python3 ./test.py
-f
2-query/function_null.py
-Q
3
python3 ./test.py
-f
2-query/count_partition.py
-Q
3
python3 ./test.py
-f
2-query/max_partition.py
-Q
3
python3 ./test.py
-f
2-query/last_row.py
-Q
3
taos-tools
@
0b8a3373
比较
b7b92226
...
0b8a3373
Subproject commit
b7b922268c4a06d9db77ffdfde0726f3d9900b72
Subproject commit
0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录