Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cf572edc
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
cf572edc
编写于
7月 23, 2022
作者:
H
Haojun Liao
提交者:
GitHub
7月 23, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15323 from taosdata/feature/3_liaohj
fix(query): update the time range after filter data block.
上级
cd4f6c9a
72574b69
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
738 addition
and
663 deletion
+738
-663
include/common/tdatablock.h
include/common/tdatablock.h
+2
-2
include/libs/executor/executor.h
include/libs/executor/executor.h
+3
-1
source/client/test/CMakeLists.txt
source/client/test/CMakeLists.txt
+1
-1
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+12
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+4
-3
source/common/src/ttime.c
source/common/src/ttime.c
+8
-11
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+4
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+6
-5
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+78
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+432
-12
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+0
-424
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+163
-135
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-1
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+13
-55
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+6
-6
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+2
-2
tests/script/tsim/parser/fill.sim
tests/script/tsim/parser/fill.sim
+1
-0
未找到文件。
include/common/tdatablock.h
浏览文件 @
cf572edc
...
...
@@ -184,8 +184,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
int32_t
getJsonValueLen
(
const
char
*
data
);
int32_t
colDataAppend
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
bool
isNull
);
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
numOfRow1
,
u
int32_t
*
capacity
,
const
SColumnInfoData
*
pSource
,
u
int32_t
numOfRow2
);
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRow1
,
int32_t
*
capacity
,
const
SColumnInfoData
*
pSource
,
int32_t
numOfRow2
);
int32_t
colDataAssign
(
SColumnInfoData
*
pColumnInfoData
,
const
SColumnInfoData
*
pSource
,
int32_t
numOfRows
,
const
SDataBlockInfo
*
pBlockInfo
);
int32_t
blockDataUpdateTsWindow
(
SSDataBlock
*
pDataBlock
,
int32_t
tsColumnIndex
);
...
...
include/libs/executor/executor.h
浏览文件 @
cf572edc
...
...
@@ -65,7 +65,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @return
*/
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
,
int32_t
*
numOfCols
,
SSchemaWrapper
**
pSchema
Wrapper
);
SSchemaWrapper
**
pSchema
);
/**
* Set the input data block for the stream scan.
...
...
@@ -196,6 +196,8 @@ int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem);
int32_t
qStreamPrepareRecover
(
qTaskInfo_t
tinfo
,
int64_t
startVer
,
int64_t
endVer
);
STimeWindow
getAlignQueryTimeWindow
(
SInterval
*
pInterval
,
int32_t
precision
,
int64_t
key
);
#ifdef __cplusplus
}
#endif
...
...
source/client/test/CMakeLists.txt
浏览文件 @
cf572edc
...
...
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE
(
clientTest clientTests.cpp
)
TARGET_LINK_LIBRARIES
(
clientTest
PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom
PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom
executor
)
ADD_EXECUTABLE
(
tmqTest tmqTest.cpp
)
...
...
source/client/test/clientTests.cpp
浏览文件 @
cf572edc
...
...
@@ -27,6 +27,7 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "taos.h"
#include "executor.h"
namespace
{
void
showDB
(
TAOS
*
pConn
)
{
...
...
@@ -823,6 +824,17 @@ TEST(testCase, async_api_test) {
TEST
(
testCase
,
update_test
)
{
SInterval
interval
=
{
0
};
interval
.
offset
=
8000
;
interval
.
interval
=
10000
;
interval
.
sliding
=
4000
;
interval
.
intervalUnit
=
's'
;
interval
.
offsetUnit
=
's'
;
interval
.
slidingUnit
=
's'
;
// STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1630000000000);
STimeWindow
w
=
getAlignQueryTimeWindow
(
&
interval
,
0
,
1629999999999
);
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
...
...
source/common/src/tdatablock.c
浏览文件 @
cf572edc
...
...
@@ -214,8 +214,8 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
}
}
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
numOfRow1
,
u
int32_t
*
capacity
,
const
SColumnInfoData
*
pSource
,
u
int32_t
numOfRow2
)
{
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRow1
,
int32_t
*
capacity
,
const
SColumnInfoData
*
pSource
,
int32_t
numOfRow2
)
{
ASSERT
(
pColumnInfoData
!=
NULL
&&
pSource
!=
NULL
&&
pColumnInfoData
->
info
.
type
==
pSource
->
info
.
type
);
if
(
numOfRow2
==
0
)
{
return
numOfRow1
;
...
...
@@ -263,7 +263,8 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui
pColumnInfoData
->
varmeta
.
length
=
len
+
oldLen
;
}
else
{
if
(
finalNumOfRows
>
*
capacity
||
(
numOfRow1
==
0
&&
pColumnInfoData
->
info
.
bytes
!=
0
))
{
ASSERT
(
finalNumOfRows
*
pColumnInfoData
->
info
.
bytes
);
// all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
// ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
char
*
tmp
=
taosMemoryRealloc
(
pColumnInfoData
->
pData
,
finalNumOfRows
*
pColumnInfoData
->
info
.
bytes
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_VND_OUT_OF_MEMORY
;
...
...
source/common/src/ttime.c
浏览文件 @
cf572edc
...
...
@@ -815,20 +815,17 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
if
(
pInterval
->
offset
>
0
)
{
start
=
taosTimeAdd
(
start
,
pInterval
->
offset
,
pInterval
->
offsetUnit
,
precision
);
if
(
start
>
t
)
{
start
=
taosTimeAdd
(
start
,
-
pInterval
->
interval
,
pInterval
->
intervalUnit
,
precision
);
}
else
{
// try to move current window to the left-hande-side, due to the offset effect.
int64_t
end
=
taosTimeAdd
(
start
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
precision
)
-
1
;
int64_t
newEnd
=
end
;
while
(
newEnd
>=
t
)
{
end
=
newEnd
;
newEnd
=
taosTimeAdd
(
newEnd
,
-
pInterval
->
sliding
,
pInterval
->
slidingUnit
,
precision
);
}
// try to move current window to the left-hande-side, due to the offset effect.
int64_t
end
=
taosTimeAdd
(
start
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
precision
)
-
1
;
start
=
taosTimeAdd
(
end
,
-
pInterval
->
interval
,
pInterval
->
intervalUnit
,
precision
)
+
1
;
int64_t
newEnd
=
end
;
while
(
newEnd
>=
t
)
{
end
=
newEnd
;
newEnd
=
taosTimeAdd
(
newEnd
,
-
pInterval
->
sliding
,
pInterval
->
slidingUnit
,
precision
);
}
start
=
taosTimeAdd
(
end
,
-
pInterval
->
interval
,
pInterval
->
intervalUnit
,
precision
)
+
1
;
}
return
start
;
...
...
source/libs/executor/inc/executil.h
浏览文件 @
cf572edc
...
...
@@ -108,6 +108,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes
doTranslateTagExpr
(
SNode
**
pNode
,
void
*
pContext
);
int32_t
getTableList
(
void
*
metaHandle
,
void
*
pVnode
,
SScanPhysiNode
*
pScanNode
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
STableListInfo
*
pListInfo
);
int32_t
getGroupIdFromTagsVal
(
void
*
pMeta
,
uint64_t
uid
,
SNodeList
*
pGroupNode
,
char
*
keyBuf
,
uint64_t
*
pGroupId
);
size_t
getTableTagsBufLen
(
const
SNodeList
*
pGroups
);
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
...
...
@@ -129,6 +132,6 @@ int32_t convertFillType(int32_t mode);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
is
TableOk
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
);
int32_t
is
QualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorimpl.h
浏览文件 @
cf572edc
...
...
@@ -422,6 +422,7 @@ typedef struct SStreamScanInfo {
// status for tmq
// SSchemaWrapper schema;
STqOffset
offset
;
SNodeList
*
pGroupTags
;
SNode
*
pTagCond
;
SNode
*
pTagIndexCond
;
}
SStreamScanInfo
;
...
...
@@ -544,9 +545,10 @@ typedef struct SProjectOperatorInfo {
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
SSDataBlock
*
existDataBlock
;
SArray
*
pPseudoColInfo
;
SLimitInfo
limitInfo
;
bool
mergeDataBlocks
;
SSDataBlock
*
pFinalRes
;
}
SProjectOperatorInfo
;
typedef
struct
SIndefOperatorInfo
{
...
...
@@ -803,7 +805,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
);
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
...
...
@@ -867,7 +869,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SNode
*
pTagCond
,
S
TimeWindowAggSupp
*
pTwAggSup
,
S
ExecTaskInfo
*
pTaskInfo
);
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
);
...
...
@@ -877,8 +879,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
/*SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, */
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SJoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
cf572edc
...
...
@@ -265,7 +265,7 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
int32_t
is
TableOk
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
)
{
int32_t
is
QualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SMetaReader
mr
=
{
0
};
...
...
@@ -356,7 +356,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
STableKeyInfo
*
info
=
taosArrayGet
(
pListInfo
->
pTableList
,
i
);
bool
qualified
=
true
;
code
=
is
TableOk
(
info
,
pTagCond
,
metaHandle
,
&
qualified
);
code
=
is
QualifiedTable
(
info
,
pTagCond
,
metaHandle
,
&
qualified
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -379,6 +379,82 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
return
code
;
}
size_t
getTableTagsBufLen
(
const
SNodeList
*
pGroups
)
{
size_t
keyLen
=
0
;
SNode
*
node
;
FOREACH
(
node
,
pGroups
)
{
SExprNode
*
pExpr
=
(
SExprNode
*
)
node
;
keyLen
+=
pExpr
->
resType
.
bytes
;
}
keyLen
+=
sizeof
(
int8_t
)
*
LIST_LENGTH
(
pGroups
);
return
keyLen
;
}
int32_t
getGroupIdFromTagsVal
(
void
*
pMeta
,
uint64_t
uid
,
SNodeList
*
pGroupNode
,
char
*
keyBuf
,
uint64_t
*
pGroupId
)
{
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pMeta
,
0
);
metaGetTableEntryByUid
(
&
mr
,
uid
);
SNodeList
*
groupNew
=
nodesCloneList
(
pGroupNode
);
nodesRewriteExprsPostOrder
(
groupNew
,
doTranslateTagExpr
,
&
mr
);
char
*
isNull
=
(
char
*
)
keyBuf
;
char
*
pStart
=
(
char
*
)
keyBuf
+
sizeof
(
int8_t
)
*
LIST_LENGTH
(
pGroupNode
);
SNode
*
pNode
;
int32_t
index
=
0
;
FOREACH
(
pNode
,
groupNew
)
{
SNode
*
pNew
=
NULL
;
int32_t
code
=
scalarCalculateConstants
(
pNode
,
&
pNew
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
REPLACE_NODE
(
pNew
);
}
else
{
taosMemoryFree
(
keyBuf
);
nodesDestroyList
(
groupNew
);
metaReaderClear
(
&
mr
);
return
code
;
}
ASSERT
(
nodeType
(
pNew
)
==
QUERY_NODE_VALUE
);
SValueNode
*
pValue
=
(
SValueNode
*
)
pNew
;
if
(
pValue
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_NULL
||
pValue
->
isNull
)
{
isNull
[
index
++
]
=
1
;
continue
;
}
else
{
isNull
[
index
++
]
=
0
;
char
*
data
=
nodesGetValueFromNode
(
pValue
);
if
(
pValue
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
tTagIsJson
(
data
))
{
terrno
=
TSDB_CODE_QRY_JSON_IN_GROUP_ERROR
;
taosMemoryFree
(
keyBuf
);
nodesDestroyList
(
groupNew
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
int32_t
len
=
getJsonValueLen
(
data
);
memcpy
(
pStart
,
data
,
len
);
pStart
+=
len
;
}
else
if
(
IS_VAR_DATA_TYPE
(
pValue
->
node
.
resType
.
type
))
{
memcpy
(
pStart
,
data
,
varDataTLen
(
data
));
pStart
+=
varDataTLen
(
data
);
}
else
{
memcpy
(
pStart
,
data
,
pValue
->
node
.
resType
.
bytes
);
pStart
+=
pValue
->
node
.
resType
.
bytes
;
}
}
}
int32_t
len
=
(
int32_t
)(
pStart
-
(
char
*
)
keyBuf
);
*
pGroupId
=
calcGroupId
(
keyBuf
,
len
);
nodesDestroyList
(
groupNew
);
metaReaderClear
(
&
mr
);
return
TSDB_CODE_SUCCESS
;
}
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
)
{
if
(
!
pNodeList
)
{
return
NULL
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
cf572edc
...
...
@@ -14,10 +14,21 @@
*/
#include "executor.h"
#include "tref.h"
#include "executorimpl.h"
#include "planner.h"
#include "tdatablock.h"
#include "vnode.h"
#include "tudf.h"
static
TdThreadOnce
initPoolOnce
=
PTHREAD_ONCE_INIT
;
int32_t
exchangeObjRefPool
=
-
1
;
static
void
initRefPool
()
{
exchangeObjRefPool
=
taosOpenRef
(
1024
,
doDestroyExchangeOperatorInfo
);
}
static
void
cleanupRefPool
()
{
int32_t
ref
=
atomic_val_compare_exchange_32
(
&
exchangeObjRefPool
,
exchangeObjRefPool
,
0
);
taosCloseRef
(
ref
);
}
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
bool
assignUid
,
char
*
id
)
{
...
...
@@ -120,8 +131,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return
code
;
}
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
,
int32_t
*
numOfCols
,
SSchemaWrapper
**
pSchemaWrapper
)
{
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
,
int32_t
*
numOfCols
,
SSchemaWrapper
**
pSchema
)
{
if
(
msg
==
NULL
)
{
// TODO create raw scan
return
NULL
;
...
...
@@ -155,7 +165,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
}
}
*
pSchema
Wrapper
=
tCloneSSchemaWrapper
(((
SExecTaskInfo
*
)
pTaskInfo
)
->
schemaInfo
.
qsw
);
*
pSchema
=
tCloneSSchemaWrapper
(((
SExecTaskInfo
*
)
pTaskInfo
)
->
schemaInfo
.
qsw
);
return
pTaskInfo
;
}
...
...
@@ -185,8 +195,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
return
pTaskInfo
;
}
static
SArray
*
filterQualifiedChildTables
(
const
SStreamScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
,
const
char
*
idstr
)
{
static
SArray
*
filterUnqualifiedTables
(
const
SStreamScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
,
const
char
*
idstr
)
{
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
// let's discard the tables those are not created according to the queried super table.
...
...
@@ -209,7 +218,7 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
if
(
pScanInfo
->
pTagCond
!=
NULL
)
{
bool
qualified
=
false
;
STableKeyInfo
info
=
{.
groupId
=
0
,
.
uid
=
mr
.
me
.
uid
};
code
=
is
TableOk
(
&
info
,
pScanInfo
->
pTagCond
,
pScanInfo
->
readHandle
.
meta
,
&
qualified
);
code
=
is
QualifiedTable
(
&
info
,
pScanInfo
->
pTagCond
,
pScanInfo
->
readHandle
.
meta
,
&
qualified
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to filter new table, uid:0x%"
PRIx64
", %s"
,
info
.
uid
,
idstr
);
continue
;
...
...
@@ -240,7 +249,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
int32_t
code
=
0
;
SStreamScanInfo
*
pScanInfo
=
pInfo
->
info
;
if
(
isAdd
)
{
// add new table id
SArray
*
qa
=
filter
QualifiedChil
dTables
(
pScanInfo
,
tableIdList
,
GET_TASKID
(
pTaskInfo
));
SArray
*
qa
=
filter
Unqualifie
dTables
(
pScanInfo
,
tableIdList
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
code
=
tqReaderAddTbUidList
(
pScanInfo
->
tqReader
,
qa
);
...
...
@@ -248,17 +257,35 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
return
code
;
}
// add to qTaskInfo
// todo refactor STableList
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
qa
);
++
i
)
{
uint64_t
*
uid
=
taosArrayGet
(
qa
,
i
);
qDebug
(
"table %ld added to task info"
,
*
uid
);
size_t
bufLen
=
(
pScanInfo
->
pGroupTags
!=
NULL
)
?
getTableTagsBufLen
(
pScanInfo
->
pGroupTags
)
:
0
;
char
*
keyBuf
=
NULL
;
if
(
bufLen
>
0
)
{
keyBuf
=
taosMemoryMalloc
(
bufLen
);
if
(
keyBuf
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
qa
);
++
i
)
{
uint64_t
*
uid
=
taosArrayGet
(
qa
,
i
);
STableKeyInfo
keyInfo
=
{.
uid
=
*
uid
,
.
groupId
=
0
};
if
(
bufLen
>
0
)
{
code
=
getGroupIdFromTagsVal
(
pScanInfo
->
readHandle
.
meta
,
keyInfo
.
uid
,
pScanInfo
->
pGroupTags
,
keyBuf
,
&
keyInfo
.
groupId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
taosArrayPush
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
&
keyInfo
);
}
if
(
keyBuf
!=
NULL
)
{
taosMemoryFree
(
keyBuf
);
}
taosArrayDestroy
(
qa
);
}
else
{
// remove the table id in current list
qDebug
(
" %d remove child tables from the stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
tableIdList
));
...
...
@@ -292,3 +319,396 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
return
0
;
}
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
const
char
*
sql
,
EOPTR_EXEC_MODEL
model
)
{
assert
(
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
taosThreadOnce
(
&
initPoolOnce
,
initRefPool
);
atexit
(
cleanupRefPool
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
sql
,
model
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
SDataSinkMgtCfg
cfg
=
{.
maxDataBlockNum
=
1000
,
.
maxDataBlockNumPerQuery
=
100
};
code
=
dsDataSinkMgtInit
(
&
cfg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
if
(
handle
)
{
void
*
pSinkParam
=
NULL
;
code
=
createDataSinkParam
(
pSubplan
->
pDataSink
,
&
pSinkParam
,
pTaskInfo
,
readHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
handle
,
pSinkParam
);
}
_error:
// if failed to add ref for all tables in this query, abort current query
return
code
;
}
#ifdef TEST_IMPL
// wait moment
int
waitMoment
(
SQInfo
*
pQInfo
)
{
if
(
pQInfo
->
sql
)
{
int
ms
=
0
;
char
*
pcnt
=
strstr
(
pQInfo
->
sql
,
" count(*)"
);
if
(
pcnt
)
return
0
;
char
*
pos
=
strstr
(
pQInfo
->
sql
,
" t_"
);
if
(
pos
)
{
pos
+=
3
;
ms
=
atoi
(
pos
);
while
(
*
pos
>=
'0'
&&
*
pos
<=
'9'
)
{
pos
++
;
}
char
unit_char
=
*
pos
;
if
(
unit_char
==
'h'
)
{
ms
*=
3600
*
1000
;
}
else
if
(
unit_char
==
'm'
)
{
ms
*=
60
*
1000
;
}
else
if
(
unit_char
==
's'
)
{
ms
*=
1000
;
}
}
if
(
ms
==
0
)
return
0
;
printf
(
"test wait sleep %dms. sql=%s ...
\n
"
,
ms
,
pQInfo
->
sql
);
if
(
ms
<
1000
)
{
taosMsleep
(
ms
);
}
else
{
int
used_ms
=
0
;
while
(
used_ms
<
ms
)
{
taosMsleep
(
1000
);
used_ms
+=
1000
;
if
(
isTaskKilled
(
pQInfo
))
{
printf
(
"test check query is canceled, sleep break.%s
\n
"
,
pQInfo
->
sql
);
break
;
}
}
}
}
return
1
;
}
#endif
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
SSDataBlock
**
pRes
,
uint64_t
*
useconds
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int64_t
threadId
=
taosGetSelfPthreadId
();
*
pRes
=
NULL
;
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pTaskInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"%s-%p execTask is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
(
void
*
)
curOwner
);
pTaskInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
pTaskInfo
->
code
;
}
if
(
pTaskInfo
->
cost
.
start
==
0
)
{
pTaskInfo
->
cost
.
start
=
taosGetTimestampMs
();
}
if
(
isTaskKilled
(
pTaskInfo
))
{
qDebug
(
"%s already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
return
TSDB_CODE_SUCCESS
;
}
// error occurs, record the error code and return to client
int32_t
ret
=
setjmp
(
pTaskInfo
->
env
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
ret
;
cleanUpUdfs
();
qDebug
(
"%s task abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
atomic_store_64
(
&
pTaskInfo
->
owner
,
0
);
return
pTaskInfo
->
code
;
}
qDebug
(
"%s execTask is launched"
,
GET_TASKID
(
pTaskInfo
));
int64_t
st
=
taosGetTimestampUs
();
*
pRes
=
pTaskInfo
->
pRoot
->
fpSet
.
getNextFn
(
pTaskInfo
->
pRoot
);
uint64_t
el
=
(
taosGetTimestampUs
()
-
st
);
pTaskInfo
->
cost
.
elapsedTime
+=
el
;
if
(
NULL
==
*
pRes
)
{
*
useconds
=
pTaskInfo
->
cost
.
elapsedTime
;
}
cleanUpUdfs
();
int32_t
current
=
(
*
pRes
!=
NULL
)
?
(
*
pRes
)
->
info
.
rows
:
0
;
uint64_t
total
=
pTaskInfo
->
pRoot
->
resultInfo
.
totalRows
;
qDebug
(
"%s task suspended, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d, elapsed:%.2f ms"
,
GET_TASKID
(
pTaskInfo
),
current
,
total
,
0
,
el
/
1000
.
0
);
atomic_store_64
(
&
pTaskInfo
->
owner
,
0
);
return
pTaskInfo
->
code
;
}
int32_t
qKillTask
(
qTaskInfo_t
qinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qinfo
;
if
(
pTaskInfo
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qAsyncKillTask
(
qinfo
);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while
(
pTaskInfo
->
owner
!=
0
)
{
taosMsleep
(
100
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qAsyncKillTask
(
qTaskInfo_t
qinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qinfo
;
if
(
pTaskInfo
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qDebug
(
"%s execTask async killed"
,
GET_TASKID
(
pTaskInfo
));
setTaskKilled
(
pTaskInfo
);
return
TSDB_CODE_SUCCESS
;
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
if
(
pTaskInfo
==
NULL
)
{
return
;
}
qDebug
(
"%s execTask completed, numOfRows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
pRoot
->
resultInfo
.
totalRows
);
queryCostStatis
(
pTaskInfo
);
// print the query cost summary
doDestroyTask
(
pTaskInfo
);
}
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
int32_t
*
resNum
,
SExplainExecInfo
**
pRes
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
capacity
=
0
;
return
getOperatorExplainExecInfo
(
pTaskInfo
->
pRoot
,
pRes
,
&
capacity
,
resNum
);
}
int32_t
qSerializeTaskStatus
(
qTaskInfo_t
tinfo
,
char
**
pOutput
,
int32_t
*
len
)
{
SExecTaskInfo
*
pTaskInfo
=
(
struct
SExecTaskInfo
*
)
tinfo
;
if
(
pTaskInfo
->
pRoot
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
nOptrWithVal
=
0
;
int32_t
code
=
encodeOperator
(
pTaskInfo
->
pRoot
,
pOutput
,
len
,
&
nOptrWithVal
);
if
((
code
==
TSDB_CODE_SUCCESS
)
&&
(
nOptrWithVal
=
0
))
{
taosMemoryFreeClear
(
*
pOutput
);
*
len
=
0
;
}
return
code
;
}
int32_t
qDeserializeTaskStatus
(
qTaskInfo_t
tinfo
,
const
char
*
pInput
,
int32_t
len
)
{
SExecTaskInfo
*
pTaskInfo
=
(
struct
SExecTaskInfo
*
)
tinfo
;
if
(
pTaskInfo
==
NULL
||
pInput
==
NULL
||
len
==
0
)
{
return
TSDB_CODE_INVALID_PARA
;
}
return
decodeOperator
(
pTaskInfo
->
pRoot
,
pInput
,
len
);
}
int32_t
qExtractStreamScanner
(
qTaskInfo_t
tinfo
,
void
**
scanner
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
while
(
1
)
{
uint8_t
type
=
pOperator
->
operatorType
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
*
scanner
=
pOperator
->
info
;
return
0
;
}
else
{
ASSERT
(
pOperator
->
numOfDownstream
==
1
);
pOperator
=
pOperator
->
pDownstream
[
0
];
}
}
}
#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
return 0;
}
#endif
int32_t
qStreamPrepareRecover
(
qTaskInfo_t
tinfo
,
int64_t
startVer
,
int64_t
endVer
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
);
pTaskInfo
->
streamInfo
.
recoverStartVer
=
startVer
;
pTaskInfo
->
streamInfo
.
recoverEndVer
=
endVer
;
pTaskInfo
->
streamInfo
.
recoverStep
=
STREAM_RECOVER_STEP__PREPARE
;
return
0
;
}
void
*
qExtractReaderFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
(
void
*
)
pInfo
->
tqReader
;
}
const
SSchemaWrapper
*
qExtractSchemaFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
pInfo
->
tqReader
->
pSchemaWrapper
;
}
const
STqOffset
*
qExtractStatusFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
&
pInfo
->
offset
;
}
void
*
qStreamExtractMetaMsg
(
qTaskInfo_t
tinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
);
return
pTaskInfo
->
streamInfo
.
metaBlk
;
}
int32_t
qStreamExtractOffset
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
);
memcpy
(
pOffset
,
&
pTaskInfo
->
streamInfo
.
lastStatus
,
sizeof
(
STqOffsetVal
));
return
0
;
}
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
const
STqOffsetVal
*
pOffset
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
);
pTaskInfo
->
streamInfo
.
prepareStatus
=
*
pOffset
;
if
(
!
tOffsetEqual
(
pOffset
,
&
pTaskInfo
->
streamInfo
.
lastStatus
))
{
while
(
1
)
{
uint8_t
type
=
pOperator
->
operatorType
;
pOperator
->
status
=
OP_OPENED
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTSInfo
->
dataReader
);
pTSInfo
->
dataReader
=
NULL
;
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
ASSERT(0);
}
#endif
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pOffset
->
version
+
1
)
<
0
)
{
return
-
1
;
}
ASSERT
(
pInfo
->
tqReader
->
pWalReader
->
curVersion
==
pOffset
->
version
+
1
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t
uid
=
pOffset
->
uid
;
int64_t
ts
=
pOffset
->
ts
;
if
(
uid
==
0
)
{
if
(
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
)
!=
0
)
{
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
0
);
uid
=
pTableInfo
->
uid
;
ts
=
INT64_MIN
;
}
else
{
return
-
1
;
}
}
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
int32_t
tableSz
=
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
);
#ifndef NDEBUG
qDebug
(
"switch to next table %ld (cursor %d), %ld rows returned"
,
uid
,
pTableScanInfo
->
currentTable
,
pInfo
->
pTableScanOp
->
resultInfo
.
totalRows
);
pInfo
->
pTableScanOp
->
resultInfo
.
totalRows
=
0
;
#endif
bool
found
=
false
;
for
(
int32_t
i
=
0
;
i
<
tableSz
;
i
++
)
{
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
i
);
if
(
pTableInfo
->
uid
==
uid
)
{
found
=
true
;
pTableScanInfo
->
currentTable
=
i
;
break
;
}
}
// TODO after dropping table, table may be not found
ASSERT
(
found
);
if
(
pTableScanInfo
->
dataReader
==
NULL
)
{
if
(
tsdbReaderOpen
(
pTableScanInfo
->
readHandle
.
vnode
,
&
pTableScanInfo
->
cond
,
pTaskInfo
->
tableqinfoList
.
pTableList
,
&
pTableScanInfo
->
dataReader
,
NULL
)
<
0
||
pTableScanInfo
->
dataReader
==
NULL
)
{
ASSERT
(
0
);
}
}
tsdbSetTableId
(
pTableScanInfo
->
dataReader
,
uid
);
int64_t
oldSkey
=
pTableScanInfo
->
cond
.
twindows
.
skey
;
pTableScanInfo
->
cond
.
twindows
.
skey
=
ts
+
1
;
tsdbReaderReset
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
);
pTableScanInfo
->
cond
.
twindows
.
skey
=
oldSkey
;
pTableScanInfo
->
scanTimes
=
0
;
qDebug
(
"tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d"
,
uid
,
ts
,
pTableScanInfo
->
currentTable
,
tableSz
);
/*}*/
}
else
{
ASSERT
(
0
);
}
return
0
;
}
else
{
ASSERT
(
pOperator
->
numOfDownstream
==
1
);
pOperator
=
pOperator
->
pDownstream
[
0
];
}
}
}
return
0
;
}
#if 0
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
}
}
return doPrepareScan(pTaskInfo->pRoot, uid, ts);
}
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
}
#endif
source/libs/executor/src/executorMain.c
已删除
100644 → 0
浏览文件 @
cd4f6c9a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkMgt.h"
#include "os.h"
#include "tmsg.h"
#include "tref.h"
#include "tudf.h"
#include "executor.h"
#include "executorimpl.h"
#include "query.h"
static
TdThreadOnce
initPoolOnce
=
PTHREAD_ONCE_INIT
;
int32_t
exchangeObjRefPool
=
-
1
;
static
void
initRefPool
()
{
exchangeObjRefPool
=
taosOpenRef
(
1024
,
doDestroyExchangeOperatorInfo
);
}
static
void
cleanupRefPool
()
{
int32_t
ref
=
atomic_val_compare_exchange_32
(
&
exchangeObjRefPool
,
exchangeObjRefPool
,
0
);
taosCloseRef
(
ref
);
}
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
const
char
*
sql
,
EOPTR_EXEC_MODEL
model
)
{
assert
(
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
taosThreadOnce
(
&
initPoolOnce
,
initRefPool
);
atexit
(
cleanupRefPool
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
sql
,
model
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
SDataSinkMgtCfg
cfg
=
{.
maxDataBlockNum
=
1000
,
.
maxDataBlockNumPerQuery
=
100
};
code
=
dsDataSinkMgtInit
(
&
cfg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
if
(
handle
)
{
void
*
pSinkParam
=
NULL
;
code
=
createDataSinkParam
(
pSubplan
->
pDataSink
,
&
pSinkParam
,
pTaskInfo
,
readHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
handle
,
pSinkParam
);
}
_error:
// if failed to add ref for all tables in this query, abort current query
return
code
;
}
#ifdef TEST_IMPL
// wait moment
int
waitMoment
(
SQInfo
*
pQInfo
)
{
if
(
pQInfo
->
sql
)
{
int
ms
=
0
;
char
*
pcnt
=
strstr
(
pQInfo
->
sql
,
" count(*)"
);
if
(
pcnt
)
return
0
;
char
*
pos
=
strstr
(
pQInfo
->
sql
,
" t_"
);
if
(
pos
)
{
pos
+=
3
;
ms
=
atoi
(
pos
);
while
(
*
pos
>=
'0'
&&
*
pos
<=
'9'
)
{
pos
++
;
}
char
unit_char
=
*
pos
;
if
(
unit_char
==
'h'
)
{
ms
*=
3600
*
1000
;
}
else
if
(
unit_char
==
'm'
)
{
ms
*=
60
*
1000
;
}
else
if
(
unit_char
==
's'
)
{
ms
*=
1000
;
}
}
if
(
ms
==
0
)
return
0
;
printf
(
"test wait sleep %dms. sql=%s ...
\n
"
,
ms
,
pQInfo
->
sql
);
if
(
ms
<
1000
)
{
taosMsleep
(
ms
);
}
else
{
int
used_ms
=
0
;
while
(
used_ms
<
ms
)
{
taosMsleep
(
1000
);
used_ms
+=
1000
;
if
(
isTaskKilled
(
pQInfo
))
{
printf
(
"test check query is canceled, sleep break.%s
\n
"
,
pQInfo
->
sql
);
break
;
}
}
}
}
return
1
;
}
#endif
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
SSDataBlock
**
pRes
,
uint64_t
*
useconds
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int64_t
threadId
=
taosGetSelfPthreadId
();
*
pRes
=
NULL
;
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pTaskInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"%s-%p execTask is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
(
void
*
)
curOwner
);
pTaskInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
pTaskInfo
->
code
;
}
if
(
pTaskInfo
->
cost
.
start
==
0
)
{
pTaskInfo
->
cost
.
start
=
taosGetTimestampMs
();
}
if
(
isTaskKilled
(
pTaskInfo
))
{
qDebug
(
"%s already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
return
TSDB_CODE_SUCCESS
;
}
// error occurs, record the error code and return to client
int32_t
ret
=
setjmp
(
pTaskInfo
->
env
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
ret
;
cleanUpUdfs
();
qDebug
(
"%s task abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
return
pTaskInfo
->
code
;
}
qDebug
(
"%s execTask is launched"
,
GET_TASKID
(
pTaskInfo
));
int64_t
st
=
taosGetTimestampUs
();
*
pRes
=
pTaskInfo
->
pRoot
->
fpSet
.
getNextFn
(
pTaskInfo
->
pRoot
);
uint64_t
el
=
(
taosGetTimestampUs
()
-
st
);
pTaskInfo
->
cost
.
elapsedTime
+=
el
;
if
(
NULL
==
*
pRes
)
{
*
useconds
=
pTaskInfo
->
cost
.
elapsedTime
;
}
cleanUpUdfs
();
int32_t
current
=
(
*
pRes
!=
NULL
)
?
(
*
pRes
)
->
info
.
rows
:
0
;
uint64_t
total
=
pTaskInfo
->
pRoot
->
resultInfo
.
totalRows
;
qDebug
(
"%s task suspended, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d, elapsed:%.2f ms"
,
GET_TASKID
(
pTaskInfo
),
current
,
total
,
0
,
el
/
1000
.
0
);
atomic_store_64
(
&
pTaskInfo
->
owner
,
0
);
return
pTaskInfo
->
code
;
}
int32_t
qKillTask
(
qTaskInfo_t
qinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qinfo
;
if
(
pTaskInfo
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qDebug
(
"%s execTask killed"
,
GET_TASKID
(
pTaskInfo
));
setTaskKilled
(
pTaskInfo
);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while
(
pTaskInfo
->
owner
!=
0
)
{
taosMsleep
(
100
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qAsyncKillTask
(
qTaskInfo_t
qinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qinfo
;
if
(
pTaskInfo
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qDebug
(
"%s execTask async killed"
,
GET_TASKID
(
pTaskInfo
));
setTaskKilled
(
pTaskInfo
);
return
TSDB_CODE_SUCCESS
;
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
if
(
pTaskInfo
==
NULL
)
{
return
;
}
qDebug
(
"%s execTask completed, numOfRows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
pRoot
->
resultInfo
.
totalRows
);
queryCostStatis
(
pTaskInfo
);
// print the query cost summary
doDestroyTask
(
pTaskInfo
);
}
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
int32_t
*
resNum
,
SExplainExecInfo
**
pRes
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
capacity
=
0
;
return
getOperatorExplainExecInfo
(
pTaskInfo
->
pRoot
,
pRes
,
&
capacity
,
resNum
);
}
int32_t
qSerializeTaskStatus
(
qTaskInfo_t
tinfo
,
char
**
pOutput
,
int32_t
*
len
)
{
SExecTaskInfo
*
pTaskInfo
=
(
struct
SExecTaskInfo
*
)
tinfo
;
if
(
pTaskInfo
->
pRoot
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
nOptrWithVal
=
0
;
int32_t
code
=
encodeOperator
(
pTaskInfo
->
pRoot
,
pOutput
,
len
,
&
nOptrWithVal
);
if
((
code
==
TSDB_CODE_SUCCESS
)
&&
(
nOptrWithVal
=
0
))
{
taosMemoryFreeClear
(
*
pOutput
);
*
len
=
0
;
}
return
code
;
}
int32_t
qDeserializeTaskStatus
(
qTaskInfo_t
tinfo
,
const
char
*
pInput
,
int32_t
len
)
{
SExecTaskInfo
*
pTaskInfo
=
(
struct
SExecTaskInfo
*
)
tinfo
;
if
(
pTaskInfo
==
NULL
||
pInput
==
NULL
||
len
==
0
)
{
return
TSDB_CODE_INVALID_PARA
;
}
return
decodeOperator
(
pTaskInfo
->
pRoot
,
pInput
,
len
);
}
int32_t
qExtractStreamScanner
(
qTaskInfo_t
tinfo
,
void
**
scanner
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
while
(
1
)
{
uint8_t
type
=
pOperator
->
operatorType
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
*
scanner
=
pOperator
->
info
;
return
0
;
}
else
{
ASSERT
(
pOperator
->
numOfDownstream
==
1
);
pOperator
=
pOperator
->
pDownstream
[
0
];
}
}
}
#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
return 0;
}
#endif
int32_t
qStreamPrepareRecover
(
qTaskInfo_t
tinfo
,
int64_t
startVer
,
int64_t
endVer
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
);
pTaskInfo
->
streamInfo
.
recoverStartVer
=
startVer
;
pTaskInfo
->
streamInfo
.
recoverEndVer
=
endVer
;
pTaskInfo
->
streamInfo
.
recoverStep
=
STREAM_RECOVER_STEP__PREPARE
;
return
0
;
}
void
*
qExtractReaderFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
(
void
*
)
pInfo
->
tqReader
;
}
const
SSchemaWrapper
*
qExtractSchemaFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
pInfo
->
tqReader
->
pSchemaWrapper
;
}
const
STqOffset
*
qExtractStatusFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
&
pInfo
->
offset
;
}
void
*
qStreamExtractMetaMsg
(
qTaskInfo_t
tinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
);
return
pTaskInfo
->
streamInfo
.
metaBlk
;
}
int32_t
qStreamExtractOffset
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
);
memcpy
(
pOffset
,
&
pTaskInfo
->
streamInfo
.
lastStatus
,
sizeof
(
STqOffsetVal
));
return
0
;
}
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
const
STqOffsetVal
*
pOffset
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
);
pTaskInfo
->
streamInfo
.
prepareStatus
=
*
pOffset
;
if
(
!
tOffsetEqual
(
pOffset
,
&
pTaskInfo
->
streamInfo
.
lastStatus
))
{
while
(
1
)
{
uint8_t
type
=
pOperator
->
operatorType
;
pOperator
->
status
=
OP_OPENED
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTSInfo
->
dataReader
);
pTSInfo
->
dataReader
=
NULL
;
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
ASSERT(0);
}
#endif
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pOffset
->
version
+
1
)
<
0
)
{
return
-
1
;
}
ASSERT
(
pInfo
->
tqReader
->
pWalReader
->
curVersion
==
pOffset
->
version
+
1
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t
uid
=
pOffset
->
uid
;
int64_t
ts
=
pOffset
->
ts
;
if
(
uid
==
0
)
{
if
(
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
)
!=
0
)
{
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
0
);
uid
=
pTableInfo
->
uid
;
ts
=
INT64_MIN
;
}
else
{
return
-
1
;
}
}
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
int32_t
tableSz
=
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
);
#ifndef NDEBUG
qDebug
(
"switch to next table %ld (cursor %d), %ld rows returned"
,
uid
,
pTableScanInfo
->
currentTable
,
pInfo
->
pTableScanOp
->
resultInfo
.
totalRows
);
pInfo
->
pTableScanOp
->
resultInfo
.
totalRows
=
0
;
#endif
bool
found
=
false
;
for
(
int32_t
i
=
0
;
i
<
tableSz
;
i
++
)
{
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
i
);
if
(
pTableInfo
->
uid
==
uid
)
{
found
=
true
;
pTableScanInfo
->
currentTable
=
i
;
break
;
}
}
// TODO after dropping table, table may be not found
ASSERT
(
found
);
if
(
pTableScanInfo
->
dataReader
==
NULL
)
{
if
(
tsdbReaderOpen
(
pTableScanInfo
->
readHandle
.
vnode
,
&
pTableScanInfo
->
cond
,
pTaskInfo
->
tableqinfoList
.
pTableList
,
&
pTableScanInfo
->
dataReader
,
NULL
)
<
0
||
pTableScanInfo
->
dataReader
==
NULL
)
{
ASSERT
(
0
);
}
}
tsdbSetTableId
(
pTableScanInfo
->
dataReader
,
uid
);
int64_t
oldSkey
=
pTableScanInfo
->
cond
.
twindows
.
skey
;
pTableScanInfo
->
cond
.
twindows
.
skey
=
ts
+
1
;
tsdbReaderReset
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
);
pTableScanInfo
->
cond
.
twindows
.
skey
=
oldSkey
;
pTableScanInfo
->
scanTimes
=
0
;
qDebug
(
"tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d"
,
uid
,
ts
,
pTableScanInfo
->
currentTable
,
tableSz
);
/*}*/
}
else
{
ASSERT
(
0
);
}
return
0
;
}
else
{
ASSERT
(
pOperator
->
numOfDownstream
==
1
);
pOperator
=
pOperator
->
pDownstream
[
0
];
}
}
}
return
0
;
}
#if 0
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
}
}
return doPrepareScan(pTaskInfo->pRoot, uid, ts);
}
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
}
#endif
source/libs/executor/src/executorimpl.c
浏览文件 @
cf572edc
...
...
@@ -1333,7 +1333,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
static
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
int8_t
*
rowRes
,
bool
keep
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
)
{
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
)
{
if
(
pFilterNode
==
NULL
||
pBlock
->
info
.
rows
==
0
)
{
return
;
}
...
...
@@ -1354,6 +1354,20 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
filterFreeInfo
(
filter
);
extractQualifiedTupleByFilterResult
(
pBlock
,
rowRes
,
keep
);
if
(
pColMatchInfo
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pColMatchInfo
);
++
i
)
{
SColMatchInfo
*
pInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
if
(
pInfo
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
targetSlotId
);
if
(
pColData
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
targetSlotId
);
break
;
}
}
}
}
taosMemoryFree
(
rowRes
);
}
...
...
@@ -3043,7 +3057,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
pInfo
,
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pAggInfo
->
pCondition
,
pInfo
->
pRes
);
doFilter
(
pAggInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
if
(
!
hasDataInGroupInfo
(
&
pAggInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
...
...
@@ -3209,6 +3223,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
}
// here check for a new group data, we need to handle the data of the previous group.
if
(
pLimitInfo
->
currentGroupId
!=
0
&&
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
pLimitInfo
->
numOfOutputGroups
+=
1
;
if
((
pLimitInfo
->
slimit
.
limit
>
0
)
&&
(
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
...
...
@@ -3221,6 +3236,11 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
// reset the value for a new group data
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
// existing rows that belongs to previous group.
if
(
pBlock
->
info
.
rows
>
0
)
{
return
PROJECT_RETRIEVE_DONE
;
}
}
// here we reach the start position, according to the limit/offset requirements.
...
...
@@ -3265,7 +3285,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
blockDataCleanup
(
pRes
);
SSDataBlock
*
pFinalRes
=
pProjectInfo
->
pFinalRes
;
blockDataCleanup
(
pFinalRes
);
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
...
...
@@ -3276,24 +3298,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
return
NULL
;
}
#if 0
if (pProjectInfo->existDataBlock) { // TODO refactor
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
pProjectInfo->existDataBlock = NULL;
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
resetResultRowEntryResult(pInfo->pCtx, pOperator->exprSupp.numOfExprs);
return pRes;
}
}
#endif
int64_t
st
=
0
;
int32_t
order
=
0
;
int32_t
scanFlag
=
0
;
...
...
@@ -3303,67 +3307,132 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SLimitInfo
*
pLimitInfo
=
&
pProjectInfo
->
limitInfo
;
while
(
1
)
{
// The downstream exec may change the value of the newgroup, so use a local variable instead.
qDebug
(
"projection call next"
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
qDebug
(
"projection get null"
);
while
(
1
)
{
while
(
1
)
{
blockDataCleanup
(
pRes
);
/*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/
doSetOperatorCompleted
(
pOperator
);
/*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/
/*pOperator->status = OP_RES_TO_RETURN;*/
/*}*/
break
;
}
if
(
pBlock
->
info
.
type
==
STREAM_RETRIEVE
)
{
// for stream interval
return
pBlock
;
}
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
// the pDataBlock are always the same one, no need to call this again
int32_t
code
=
getTableScanInfo
(
pOperator
->
pDownstream
[
0
],
&
order
,
&
scanFlag
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
if
(
pBlock
->
info
.
type
==
STREAM_RETRIEVE
)
{
// for stream interval
return
pBlock
;
}
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
currentGroupId
==
0
||
pLimitInfo
->
currentGroupId
==
pBlock
->
info
.
groupId
)
{
// it is the first group
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
continue
;
}
else
if
(
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
// now it is the data from a new group
pLimitInfo
->
remainGroupOffset
-=
1
;
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
code
=
projectApplyFunctions
(
pSup
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pSup
->
pCtx
,
pSup
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
// ignore data block in current group
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
continue
;
}
}
// set current group id of the project operator
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
}
// remainGroupOffset == 0
// here check for a new group data, we need to handle the data of the previous group.
if
(
pLimitInfo
->
currentGroupId
!=
0
&&
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
pLimitInfo
->
numOfOutputGroups
+=
1
;
if
((
pLimitInfo
->
slimit
.
limit
>
0
)
&&
(
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
// reset the value for a new group data
// existing rows that belongs to previous group.
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
}
// the pDataBlock are always the same one, no need to call this again
int32_t
code
=
getTableScanInfo
(
pOperator
->
pDownstream
[
0
],
&
order
,
&
scanFlag
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
code
=
projectApplyFunctions
(
pSup
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pSup
->
pCtx
,
pSup
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
int32_t
status
=
handleLimitOffset
(
pOperator
,
&
pProjectInfo
->
limitInfo
,
pInfo
->
pRes
,
true
);
// set current group id
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
// filter shall be applied after apply functions and limit/offset on the result
doFilter
(
pProjectInfo
->
pFilterNode
,
pInfo
->
pRes
);
if
(
pLimitInfo
->
remainOffset
>=
pInfo
->
pRes
->
info
.
rows
)
{
pLimitInfo
->
remainOffset
-=
pInfo
->
pRes
->
info
.
rows
;
blockDataCleanup
(
pInfo
->
pRes
);
continue
;
}
else
if
(
pLimitInfo
->
remainOffset
<
pInfo
->
pRes
->
info
.
rows
&&
pLimitInfo
->
remainOffset
>
0
)
{
blockDataTrimFirstNRows
(
pInfo
->
pRes
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
}
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
)
{
// check for the limitation in each group
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pInfo
->
pRes
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pInfo
->
pRes
,
keepRows
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
}
pLimitInfo
->
numOfOutputRows
+=
pInfo
->
pRes
->
info
.
rows
;
break
;
}
if
(
status
==
PROJECT_RETRIEVE_CONTINUE
||
pInfo
->
pRes
->
info
.
rows
==
0
)
{
continue
;
}
else
if
(
status
==
PROJECT_RETRIEVE_DONE
)
{
// no results generated
if
(
pInfo
->
pRes
->
info
.
rows
==
0
||
(
!
pProjectInfo
->
mergeDataBlocks
))
{
break
;
}
}
size_t
rows
=
pInfo
->
pRes
->
info
.
rows
;
pProjectInfo
->
limitInfo
.
numOfOutputRows
+=
rows
;
if
(
pProjectInfo
->
mergeDataBlocks
)
{
pFinalRes
->
info
.
groupId
=
pInfo
->
pRes
->
info
.
groupId
;
pFinalRes
->
info
.
version
=
pInfo
->
pRes
->
info
.
version
;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
// continue merge data, ignore the group id
blockDataMerge
(
pFinalRes
,
pInfo
->
pRes
);
if
(
pFinalRes
->
info
.
rows
+
pInfo
->
pRes
->
info
.
rows
<=
pOperator
->
resultInfo
.
threshold
)
{
continue
;
}
}
// do apply filter
SSDataBlock
*
p
=
pProjectInfo
->
mergeDataBlocks
?
pFinalRes
:
pRes
;
doFilter
(
pProjectInfo
->
pFilterNode
,
p
,
NULL
);
if
(
p
->
info
.
rows
>
0
)
{
break
;
}
}
SSDataBlock
*
p
=
pProjectInfo
->
mergeDataBlocks
?
pFinalRes
:
pRes
;
pOperator
->
resultInfo
.
totalRows
+=
p
->
info
.
rows
;
if
(
pOperator
->
cost
.
openCost
==
0
)
{
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
}
return
(
rows
>
0
)
?
pInfo
->
pRes
:
NULL
;
return
(
p
->
info
.
rows
>
0
)
?
p
:
NULL
;
}
static
void
doHandleRemainBlockForNewGroupImpl
(
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
...
...
@@ -3492,7 +3561,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break
;
}
doFilter
(
pInfo
->
pCondition
,
fillResult
);
doFilter
(
pInfo
->
pCondition
,
fillResult
,
pInfo
->
pColMatchColInfo
);
if
(
fillResult
->
info
.
rows
>
0
)
{
break
;
}
...
...
@@ -3755,6 +3824,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
cleanupAggSup
(
&
pInfo
->
aggSup
);
taosArrayDestroy
(
pInfo
->
pPseudoColInfo
);
blockDataDestroy
(
pInfo
->
pFinalRes
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -3814,7 +3884,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initLimitInfo
(
pProjPhyNode
->
node
.
pLimit
,
pProjPhyNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFilterNode
=
pProjPhyNode
->
node
.
pConditions
;
pInfo
->
mergeDataBlocks
=
pProjPhyNode
->
mergeDataBlock
;
int32_t
numOfRows
=
4096
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
...
...
@@ -3950,7 +4023,7 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
}
}
doFilter
(
pIndefInfo
->
pCondition
,
pInfo
->
pRes
);
doFilter
(
pIndefInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
size_t
rows
=
pInfo
->
pRes
->
info
.
rows
;
if
(
rows
>
0
||
pOperator
->
status
==
OP_EXEC_DONE
)
{
break
;
...
...
@@ -4134,9 +4207,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
return
pTaskInfo
;
}
static
STsdbReader
*
doCreateDataReader
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
const
char
*
idstr
);
static
SArray
*
extractColumnInfo
(
SNodeList
*
pNodeList
);
SSchemaWrapper
*
extractQueriedColumnSchema
(
SScanPhysiNode
*
pScanNode
);
...
...
@@ -4177,9 +4247,11 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
}
SSchemaWrapper
*
extractQueriedColumnSchema
(
SScanPhysiNode
*
pScanNode
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pScanNode
->
pScanCols
);
int32_t
numOfCols
=
LIST_LENGTH
(
pScanNode
->
pScanCols
);
int32_t
numOfTags
=
LIST_LENGTH
(
pScanNode
->
pScanPseudoCols
);
SSchemaWrapper
*
pqSw
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchemaWrapper
));
pqSw
->
pSchema
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
SSchema
));
pqSw
->
pSchema
=
taosMemoryCalloc
(
numOfCols
+
numOfTags
,
sizeof
(
SSchema
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
STargetNode
*
pNode
=
(
STargetNode
*
)
nodesListGetNode
(
pScanNode
->
pScanCols
,
i
);
...
...
@@ -4192,6 +4264,22 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
strncpy
(
pSchema
->
name
,
pColNode
->
colName
,
tListLen
(
pSchema
->
name
));
}
// this the tags and pseudo function columns, we only keep the tag columns
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
STargetNode
*
pNode
=
(
STargetNode
*
)
nodesListGetNode
(
pScanNode
->
pScanPseudoCols
,
i
);
int32_t
type
=
nodeType
(
pNode
->
pExpr
);
if
(
type
==
QUERY_NODE_COLUMN
)
{
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
->
pExpr
;
SSchema
*
pSchema
=
&
pqSw
->
pSchema
[
pqSw
->
nCols
++
];
pSchema
->
colId
=
pColNode
->
colId
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
type
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
bytes
;
strncpy
(
pSchema
->
name
,
pColNode
->
colName
,
tListLen
(
pSchema
->
name
));
}
}
return
pqSw
;
}
...
...
@@ -4293,69 +4381,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
int32_t
groupNum
=
0
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
i
++
)
{
STableKeyInfo
*
info
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pHandle
->
meta
,
0
);
metaGetTableEntryByUid
(
&
mr
,
info
->
uid
);
SNodeList
*
groupNew
=
nodesCloneList
(
group
);
nodesRewriteExprsPostOrder
(
groupNew
,
doTranslateTagExpr
,
&
mr
);
char
*
isNull
=
(
char
*
)
keyBuf
;
char
*
pStart
=
(
char
*
)
keyBuf
+
nullFlagSize
;
SNode
*
pNode
;
int32_t
index
=
0
;
FOREACH
(
pNode
,
groupNew
)
{
SNode
*
pNew
=
NULL
;
int32_t
code
=
scalarCalculateConstants
(
pNode
,
&
pNew
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
REPLACE_NODE
(
pNew
);
}
else
{
taosMemoryFree
(
keyBuf
);
nodesDestroyList
(
groupNew
);
metaReaderClear
(
&
mr
);
return
code
;
}
ASSERT
(
nodeType
(
pNew
)
==
QUERY_NODE_VALUE
);
SValueNode
*
pValue
=
(
SValueNode
*
)
pNew
;
if
(
pValue
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_NULL
||
pValue
->
isNull
)
{
isNull
[
index
++
]
=
1
;
continue
;
}
else
{
isNull
[
index
++
]
=
0
;
char
*
data
=
nodesGetValueFromNode
(
pValue
);
if
(
pValue
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
tTagIsJson
(
data
))
{
terrno
=
TSDB_CODE_QRY_JSON_IN_GROUP_ERROR
;
taosMemoryFree
(
keyBuf
);
nodesDestroyList
(
groupNew
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
int32_t
len
=
getJsonValueLen
(
data
);
memcpy
(
pStart
,
data
,
len
);
pStart
+=
len
;
}
else
if
(
IS_VAR_DATA_TYPE
(
pValue
->
node
.
resType
.
type
))
{
memcpy
(
pStart
,
data
,
varDataTLen
(
data
));
pStart
+=
varDataTLen
(
data
);
}
else
{
memcpy
(
pStart
,
data
,
pValue
->
node
.
resType
.
bytes
);
pStart
+=
pValue
->
node
.
resType
.
bytes
;
}
}
int32_t
code
=
getGroupIdFromTagsVal
(
pHandle
->
meta
,
info
->
uid
,
group
,
keyBuf
,
&
info
->
groupId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
int32_t
len
=
(
int32_t
)(
pStart
-
(
char
*
)
keyBuf
);
uint64_t
groupId
=
calcGroupId
(
keyBuf
,
len
);
taosHashPut
(
pTableListInfo
->
map
,
&
(
info
->
uid
),
sizeof
(
uint64_t
),
&
groupId
,
sizeof
(
uint64_t
));
info
->
groupId
=
groupId
;
taosHashPut
(
pTableListInfo
->
map
,
&
(
info
->
uid
),
sizeof
(
uint64_t
),
&
info
->
groupId
,
sizeof
(
uint64_t
));
groupNum
++
;
nodesDestroyList
(
groupNew
);
metaReaderClear
(
&
mr
);
}
taosMemoryFree
(
keyBuf
);
if
(
pTableListInfo
->
needSortTableByGroupId
)
{
...
...
@@ -4443,12 +4477,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return
createExchangeOperatorInfo
(
pHandle
->
pMsgCb
->
clientRpc
,
(
SExchangePhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
==
type
)
{
STableScanPhysiNode
*
pTableScanNode
=
(
STableScanPhysiNode
*
)
pPhyNode
;
STimeWindowAggSupp
aggSup
=
(
STimeWindowAggSupp
){
.
waterMark
=
pTableScanNode
->
watermark
,
.
calTrigger
=
pTableScanNode
->
triggerType
,
.
maxTs
=
INT64_MIN
,
};
if
(
pHandle
->
vnode
)
{
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
...
...
@@ -4468,7 +4496,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
#endif
pTaskInfo
->
schemaInfo
.
qsw
=
extractQueriedColumnSchema
(
&
pTableScanNode
->
scan
);
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
,
pTableScanNode
,
pTagCond
,
&
aggSup
,
pTaskInfo
);
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
,
pTableScanNode
,
pTagCond
,
pTaskInfo
);
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
==
type
)
{
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
cf572edc
...
...
@@ -299,7 +299,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pRes
);
doFilter
(
pInfo
->
pCondition
,
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
cf572edc
...
...
@@ -211,7 +211,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
break
;
}
if
(
pJoinInfo
->
pCondAfterMerge
!=
NULL
)
{
doFilter
(
pJoinInfo
->
pCondAfterMerge
,
pRes
);
doFilter
(
pJoinInfo
->
pCondAfterMerge
,
pRes
,
NULL
);
}
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
cf572edc
...
...
@@ -264,7 +264,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
);
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColMatchInfo
);
int64_t
et
=
taosGetTimestampMs
();
pTableScanInfo
->
readRecorder
.
filterTime
+=
(
et
-
st
);
...
...
@@ -273,6 +273,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
pCost
->
filterOutBlocks
+=
1
;
qDebug
(
"%s data block filter out, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
}
else
{
qDebug
(
"%s data block filter out, elapsed time:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
(
et
-
st
));
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1134,7 +1136,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
}
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
blockDataFreeRes
((
SSDataBlock
*
)
pBlock
);
return
0
;
...
...
@@ -1415,7 +1417,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SNode
*
pTagCond
,
S
TimeWindowAggSupp
*
pTwSup
,
S
ExecTaskInfo
*
pTaskInfo
)
{
SExecTaskInfo
*
pTaskInfo
)
{
SStreamScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -1428,8 +1430,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SDataBlockDescNode
*
pDescNode
=
pScanPhyNode
->
node
.
pOutputDataBlockDesc
;
pInfo
->
pTagCond
=
pTagCond
;
pInfo
->
twAggSup
=
*
pTwSup
;
pInfo
->
pGroupTags
=
pTableScanNode
->
pGroupTags
;
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
){
.
waterMark
=
pTableScanNode
->
watermark
,
.
calTrigger
=
pTableScanNode
->
triggerType
,
.
maxTs
=
INT64_MIN
,
};
int32_t
numOfCols
=
0
;
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
...
...
@@ -1641,55 +1647,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
);
#if 0
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
code = filterSetDataFromSlotId(filter, ¶m1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
// TODO refactor
int32_t numOfRow = 0;
for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);
if (keep) {
colDataAssign(pDest, pSrc, pInfo->pRes->info.rows, &px->info);
numOfRow = pInfo->pRes->info.rows;
} else if (NULL != rowRes) {
numOfRow = 0;
for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
if (colDataIsNull_s(pSrc, j)) {
colDataAppendNULL(pDest, numOfRow);
} else {
colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
}
numOfRow += 1;
}
} else {
numOfRow = 0;
}
}
px->info.rows = numOfRow;
pInfo->pRes = px;
#endif
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
...
...
@@ -2657,7 +2615,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
}
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
);
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColMatchInfo
);
int64_t
et
=
taosGetTimestampMs
();
pTableScanInfo
->
readRecorder
.
filterTime
+=
(
et
-
st
);
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
cf572edc
...
...
@@ -216,7 +216,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
return
NULL
;
}
doFilter
(
pInfo
->
pCondition
,
pBlock
);
doFilter
(
pInfo
->
pCondition
,
pBlock
,
pInfo
->
pColMatchInfo
);
if
(
blockDataGetNumOfRows
(
pBlock
)
==
0
)
{
continue
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
cf572edc
...
...
@@ -1178,7 +1178,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -1219,7 +1219,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -1256,7 +1256,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBlock
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBlock
);
doFilter
(
pInfo
->
pCondition
,
pBlock
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -1970,7 +1970,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -2014,7 +2014,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -4638,7 +4638,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
getTableScanInfo
(
pOperator
,
&
iaInfo
->
order
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
iaInfo
->
order
,
scanFlag
,
true
);
doMergeAlignedIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
doFilter
(
miaInfo
->
pCondition
,
pRes
);
doFilter
(
miaInfo
->
pCondition
,
pRes
,
NULL
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
capacity
)
{
break
;
}
...
...
tests/script/jenkins/basic.txt
浏览文件 @
cf572edc
...
...
@@ -99,7 +99,7 @@
./test.sh -f tsim/parser/commit.sim
# TD-17661 ./test.sh -f tsim/parser/condition.sim
./test.sh -f tsim/parser/constCol.sim
./test.sh -f tsim/parser/create_db.sim
#
./test.sh -f tsim/parser/create_db.sim
./test.sh -f tsim/parser/create_mt.sim
# TD-17653 ./test.sh -f tsim/parser/create_tb_with_tag_name.sim
./test.sh -f tsim/parser/create_tb.sim
...
...
@@ -223,7 +223,7 @@
# ---- stream
./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic1.sim
#
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/distributeInterval0.sim
...
...
tests/script/tsim/parser/fill.sim
浏览文件 @
cf572edc
...
...
@@ -960,6 +960,7 @@ endi
sql select _wstart, max(k)-min(k),last(k)-first(k),0-spread(k) from tm0 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 4:2:15' interval(500a) fill(value, 99,91,90,89,88,87,86,85) ;
if $rows != 21749 then
print expect 21749, actual: $rows
return -1
endi
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录