Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4d4a5224
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4d4a5224
编写于
7月 22, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(query): update the time range after filter data block.
上级
2b199e7a
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
35 addition
and
67 deletion
+35
-67
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+19
-5
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-1
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+6
-52
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+6
-6
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
4d4a5224
...
...
@@ -807,7 +807,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
);
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
4d4a5224
...
...
@@ -1333,7 +1333,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
static
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
int8_t
*
rowRes
,
bool
keep
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
)
{
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
)
{
if
(
pFilterNode
==
NULL
||
pBlock
->
info
.
rows
==
0
)
{
return
;
}
...
...
@@ -1354,6 +1354,20 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
filterFreeInfo
(
filter
);
extractQualifiedTupleByFilterResult
(
pBlock
,
rowRes
,
keep
);
if
(
pColMatchInfo
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pColMatchInfo
);
++
i
)
{
SColMatchInfo
*
pInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
if
(
pInfo
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
targetSlotId
);
if
(
pColData
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
targetSlotId
);
break
;
}
}
}
}
taosMemoryFree
(
rowRes
);
}
...
...
@@ -3040,7 +3054,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
pInfo
,
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pAggInfo
->
pCondition
,
pInfo
->
pRes
);
doFilter
(
pAggInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
if
(
!
hasDataInGroupInfo
(
&
pAggInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
...
...
@@ -3401,7 +3415,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// do apply filter
SSDataBlock
*
p
=
pProjectInfo
->
mergeDataBlocks
?
pFinalRes
:
pRes
;
doFilter
(
pProjectInfo
->
pFilterNode
,
p
);
doFilter
(
pProjectInfo
->
pFilterNode
,
p
,
NULL
);
if
(
p
->
info
.
rows
>
0
)
{
break
;
}
...
...
@@ -3543,7 +3557,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break
;
}
doFilter
(
pInfo
->
pCondition
,
fillResult
);
doFilter
(
pInfo
->
pCondition
,
fillResult
,
pInfo
->
pColMatchColInfo
);
if
(
fillResult
->
info
.
rows
>
0
)
{
break
;
}
...
...
@@ -4005,7 +4019,7 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
}
}
doFilter
(
pIndefInfo
->
pCondition
,
pInfo
->
pRes
);
doFilter
(
pIndefInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
size_t
rows
=
pInfo
->
pRes
->
info
.
rows
;
if
(
rows
>
0
||
pOperator
->
status
==
OP_EXEC_DONE
)
{
break
;
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
4d4a5224
...
...
@@ -299,7 +299,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pRes
);
doFilter
(
pInfo
->
pCondition
,
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
4d4a5224
...
...
@@ -211,7 +211,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
break
;
}
if
(
pJoinInfo
->
pCondAfterMerge
!=
NULL
)
{
doFilter
(
pJoinInfo
->
pCondAfterMerge
,
pRes
);
doFilter
(
pJoinInfo
->
pCondAfterMerge
,
pRes
,
NULL
);
}
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
4d4a5224
...
...
@@ -265,7 +265,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
);
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColMatchInfo
);
int64_t
et
=
taosGetTimestampMs
();
pTableScanInfo
->
readRecorder
.
filterTime
+=
(
et
-
st
);
...
...
@@ -274,6 +274,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
pCost
->
filterOutBlocks
+=
1
;
qDebug
(
"%s data block filter out, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
}
else
{
qDebug
(
"%s data block filter out, elapsed time:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
(
et
-
st
));
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1225,7 +1227,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
}
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
blockDataFreeRes
((
SSDataBlock
*
)
pBlock
);
return
0
;
...
...
@@ -1761,55 +1763,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
);
#if 0
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
code = filterSetDataFromSlotId(filter, ¶m1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
// TODO refactor
int32_t numOfRow = 0;
for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);
if (keep) {
colDataAssign(pDest, pSrc, pInfo->pRes->info.rows, &px->info);
numOfRow = pInfo->pRes->info.rows;
} else if (NULL != rowRes) {
numOfRow = 0;
for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
if (colDataIsNull_s(pSrc, j)) {
colDataAppendNULL(pDest, numOfRow);
} else {
colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
}
numOfRow += 1;
}
} else {
numOfRow = 0;
}
}
px->info.rows = numOfRow;
pInfo->pRes = px;
#endif
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
...
...
@@ -2777,7 +2731,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
}
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
);
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColMatchInfo
);
int64_t
et
=
taosGetTimestampMs
();
pTableScanInfo
->
readRecorder
.
filterTime
+=
(
et
-
st
);
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
4d4a5224
...
...
@@ -216,7 +216,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
return
NULL
;
}
doFilter
(
pInfo
->
pCondition
,
pBlock
);
doFilter
(
pInfo
->
pCondition
,
pBlock
,
pInfo
->
pColMatchInfo
);
if
(
blockDataGetNumOfRows
(
pBlock
)
==
0
)
{
continue
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
4d4a5224
...
...
@@ -1178,7 +1178,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -1219,7 +1219,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -1256,7 +1256,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBlock
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBlock
);
doFilter
(
pInfo
->
pCondition
,
pBlock
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -1969,7 +1969,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -2013,7 +2013,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -4626,7 +4626,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
getTableScanInfo
(
pOperator
,
&
iaInfo
->
order
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
iaInfo
->
order
,
scanFlag
,
true
);
doMergeAlignedIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
doFilter
(
miaInfo
->
pCondition
,
pRes
);
doFilter
(
miaInfo
->
pCondition
,
pRes
,
NULL
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
capacity
)
{
break
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录