Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9b6f008c
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
9b6f008c
编写于
3月 29, 2022
作者:
H
Haojun Liao
提交者:
GitHub
3月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11082 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
上级
c05368f0
14e69034
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
313 addition
and
314 deletion
+313
-314
2.0/src/query/inc/qExecutor.h
2.0/src/query/inc/qExecutor.h
+3
-3
2.0/src/query/src/qExecutor.c
2.0/src/query/src/qExecutor.c
+5
-5
include/common/tdatablock.h
include/common/tdatablock.h
+3
-2
include/libs/executor/executor.h
include/libs/executor/executor.h
+10
-61
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+8
-9
source/common/test/commonTests.cpp
source/common/test/commonTests.cpp
+2
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+11
-11
source/libs/executor/inc/tsort.h
source/libs/executor/inc/tsort.h
+1
-1
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+0
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+17
-12
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+227
-186
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+22
-16
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+2
-2
source/libs/executor/test/sortTests.cpp
source/libs/executor/test/sortTests.cpp
+2
-2
未找到文件。
2.0/src/query/inc/qExecutor.h
浏览文件 @
9b6f008c
...
...
@@ -574,11 +574,11 @@ typedef struct SMultiwayMergeInfo {
}
SMultiwayMergeInfo
;
// todo support the disk-based sort
typedef
struct
S
Order
OperatorInfo
{
typedef
struct
S
Sort
OperatorInfo
{
int32_t
colIndex
;
int32_t
order
;
SSDataBlock
*
pDataBlock
;
}
S
Order
OperatorInfo
;
}
S
Sort
OperatorInfo
;
void
appendUpstream
(
SOperatorInfo
*
p
,
SOperatorInfo
*
pUpstream
);
...
...
@@ -609,7 +609,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t
numOfOutput
,
SColumnInfo
*
pCols
,
int32_t
numOfFilter
);
SOperatorInfo
*
createJoinOperatorInfo
(
SOperatorInfo
**
pUpstream
,
int32_t
numOfUpstream
,
SSchema
*
pSchema
,
int32_t
numOfOutput
);
SOperatorInfo
*
create
Order
OperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrderVal
*
pOrderVal
);
SOperatorInfo
*
create
Sort
OperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrderVal
*
pOrderVal
);
SSDataBlock
*
doGlobalAggregate
(
void
*
param
,
bool
*
newgroup
);
SSDataBlock
*
doMultiwayMergeSort
(
void
*
param
,
bool
*
newgroup
);
...
...
2.0/src/query/src/qExecutor.c
浏览文件 @
9b6f008c
...
...
@@ -2301,7 +2301,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
case
OP_Order
:
{
pRuntimeEnv
->
proot
=
create
Order
OperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
,
&
pQueryAttr
->
order
);
pRuntimeEnv
->
proot
=
create
Sort
OperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
,
&
pQueryAttr
->
order
);
break
;
}
...
...
@@ -5516,7 +5516,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return
NULL
;
}
S
Order
OperatorInfo
*
pInfo
=
pOperator
->
info
;
S
Sort
OperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pBlock
=
NULL
;
while
(
1
)
{
...
...
@@ -5556,8 +5556,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return
(
pInfo
->
pDataBlock
->
info
.
rows
>
0
)
?
pInfo
->
pDataBlock
:
NULL
;
}
SOperatorInfo
*
create
Order
OperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrderVal
*
pOrderVal
)
{
S
OrderOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SOrder
OperatorInfo
));
SOperatorInfo
*
create
Sort
OperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrderVal
*
pOrderVal
)
{
S
SortOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SSort
OperatorInfo
));
{
SSDataBlock
*
pDataBlock
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
...
...
@@ -6611,7 +6611,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
}
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
S
OrderOperatorInfo
*
pInfo
=
(
SOrder
OperatorInfo
*
)
param
;
S
SortOperatorInfo
*
pInfo
=
(
SSort
OperatorInfo
*
)
param
;
pInfo
->
pDataBlock
=
blockDataDestroy
(
pInfo
->
pDataBlock
);
}
...
...
include/common/tdatablock.h
浏览文件 @
9b6f008c
...
...
@@ -29,8 +29,9 @@ typedef struct SCorEpSet {
}
SCorEpSet
;
typedef
struct
SBlockOrderInfo
{
bool
nullFirst
;
int32_t
order
;
int32_t
colIndex
;
int32_t
slotId
;
SColumnInfoData
*
pColData
;
}
SBlockOrderInfo
;
...
...
@@ -176,7 +177,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
SSchema
*
blockDataExtractSchema
(
const
SSDataBlock
*
pBlock
,
int32_t
*
numOfCols
);
int32_t
blockDataSort
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
);
int32_t
blockDataSort
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
);
int32_t
blockDataSort_rv
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
);
int32_t
blockDataEnsureColumnCapacity
(
SColumnInfoData
*
pColumn
,
uint32_t
numOfRows
);
...
...
include/libs/executor/executor.h
浏览文件 @
9b6f008c
...
...
@@ -54,6 +54,16 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
*/
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
input
,
int32_t
type
);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
* @param pBlocks
* @param numOfInputBlock
* @param type
* @return
*/
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
void
**
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
);
/**
* Update the table id list, add or remove.
*
...
...
@@ -86,16 +96,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
*/
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
SSDataBlock
**
pRes
,
uint64_t
*
useconds
);
/**
* Retrieve the produced results information, if current query is not paused or completed,
* this function will be blocked to wait for the query execution completed or paused,
* in which case enough results have been produced already.
*
* @param tinfo
* @return
*/
int32_t
qRetrieveQueryResultInfo
(
qTaskInfo_t
tinfo
,
bool
*
buildRes
,
void
*
pRspContext
);
/**
* kill the ongoing query and free the query handle and corresponding resources automatically
* @param tinfo qhandle
...
...
@@ -158,50 +158,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/
int32_t
qUpdateQueriedTableIdList
(
qTaskInfo_t
tinfo
,
int64_t
uid
,
int32_t
type
);
//================================================================================================
// query handle management
/**
* Query handle mgmt object
* @param vgId
* @return
*/
void
*
qOpenTaskMgmt
(
int32_t
vgId
);
/**
* broadcast the close information and wait for all query stop.
* @param pExecutor
*/
void
qTaskMgmtNotifyClosing
(
void
*
pExecutor
);
/**
* Re-open the query handle management module when opening the vnode again.
* @param pExecutor
*/
void
qQueryMgmtReOpen
(
void
*
pExecutor
);
/**
* Close query mgmt and clean up resources.
* @param pExecutor
*/
void
qCleanupTaskMgmt
(
void
*
pExecutor
);
/**
* Add the query into the query mgmt object
* @param pMgmt
* @param qId
* @param qInfo
* @return
*/
void
**
qRegisterTask
(
void
*
pMgmt
,
uint64_t
qId
,
void
*
qInfo
);
/**
* acquire the query handle according to the key from query mgmt object.
* @param pMgmt
* @param key
* @return
*/
void
**
qAcquireTask
(
void
*
pMgmt
,
uint64_t
key
);
/**
* release the query handle and decrease the reference count in cache
* @param pMgmt
...
...
@@ -211,13 +167,6 @@ void** qAcquireTask(void* pMgmt, uint64_t key);
*/
void
**
qReleaseTask
(
void
*
pMgmt
,
void
*
pQInfo
,
bool
freeHandle
);
/**
* De-register the query handle from the management module and free it immediately.
* @param pMgmt
* @param pQInfo
* @return
*/
void
**
qDeregisterQInfo
(
void
*
pMgmt
,
void
*
pQInfo
);
void
qProcessFetchRsp
(
void
*
parent
,
struct
SRpcMsg
*
pMsg
,
struct
SEpSet
*
pEpSet
);
...
...
source/common/src/tdatablock.c
浏览文件 @
9b6f008c
...
...
@@ -647,7 +647,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
typedef
struct
SSDataBlockSortHelper
{
SArray
*
orderInfo
;
// SArray<SBlockOrderInfo>
SSDataBlock
*
pDataBlock
;
bool
nullFirst
;
}
SSDataBlockSortHelper
;
int32_t
dataBlockCompar
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
...
...
@@ -672,11 +671,11 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
}
if
(
rightNull
)
{
return
p
Help
er
->
nullFirst
?
1
:
-
1
;
return
p
Ord
er
->
nullFirst
?
1
:
-
1
;
}
if
(
leftNull
)
{
return
p
Help
er
->
nullFirst
?
-
1
:
1
;
return
p
Ord
er
->
nullFirst
?
-
1
:
1
;
}
}
...
...
@@ -907,7 +906,7 @@ static __compar_fn_t getComparFn(int32_t type, int32_t order) {
}
}
int32_t
blockDataSort
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
)
{
int32_t
blockDataSort
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
)
{
ASSERT
(
pDataBlock
!=
NULL
&&
pOrderInfo
!=
NULL
);
if
(
pDataBlock
->
info
.
rows
<=
1
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -922,7 +921,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pOrderInfo
);
++
i
)
{
SBlockOrderInfo
*
pInfo
=
taosArrayGet
(
pOrderInfo
,
i
);
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pInfo
->
colIndex
);
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pInfo
->
slotId
);
if
(
pColInfoData
->
hasNull
)
{
sortColumnHasNull
=
true
;
}
...
...
@@ -961,10 +960,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
int64_t
p0
=
taosGetTimestampUs
();
SSDataBlockSortHelper
helper
=
{.
nullFirst
=
nullFirst
,
.
pDataBlock
=
pDataBlock
,
.
orderInfo
=
pOrderInfo
};
SSDataBlockSortHelper
helper
=
{.
pDataBlock
=
pDataBlock
,
.
orderInfo
=
pOrderInfo
};
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
helper
.
orderInfo
);
++
i
)
{
struct
SBlockOrderInfo
*
pInfo
=
taosArrayGet
(
helper
.
orderInfo
,
i
);
pInfo
->
pColData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pInfo
->
colIndex
);
pInfo
->
pColData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pInfo
->
slotId
);
}
taosqsort
(
index
,
rows
,
sizeof
(
int32_t
),
&
helper
,
dataBlockCompar
);
...
...
@@ -1012,7 +1011,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SBlockOrderInfo
*
pInfo
=
taosArrayGet
(
pOrderInfo
,
i
);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
colIndex
);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
slotId
);
pInfo
->
pColData
=
pColInfo
;
sortValLengthPerRow
+=
pColInfo
->
info
.
bytes
;
}
...
...
@@ -1106,7 +1105,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
// Allocate the additional buffer.
int64_t
p0
=
taosGetTimestampUs
();
SSDataBlockSortHelper
helper
=
{.
nullFirst
=
nullFirst
,
.
pDataBlock
=
pDataBlock
,
.
orderInfo
=
pOrderInfo
};
SSDataBlockSortHelper
helper
=
{.
pDataBlock
=
pDataBlock
,
.
orderInfo
=
pOrderInfo
};
uint32_t
rows
=
pDataBlock
->
info
.
rows
;
SHelper
*
index
=
createTupleIndex_rv
(
rows
,
helper
.
orderInfo
,
pDataBlock
);
...
...
source/common/test/commonTests.cpp
浏览文件 @
9b6f008c
...
...
@@ -167,10 +167,10 @@ TEST(testCase, Datablock_test) {
printf
(
"the second row of binary:%s, length:%d
\n
"
,
(
char
*
)
varDataVal
(
pData
),
varDataLen
(
pData
));
SArray
*
pOrderInfo
=
taosArrayInit
(
3
,
sizeof
(
SBlockOrderInfo
));
SBlockOrderInfo
order
=
{.
order
=
TSDB_ORDER_ASC
,
.
colIndex
=
0
};
SBlockOrderInfo
order
=
{.
nullFirst
=
true
,
.
order
=
TSDB_ORDER_ASC
,
.
slotId
=
0
};
taosArrayPush
(
pOrderInfo
,
&
order
);
blockDataSort
(
b
,
pOrderInfo
,
true
);
blockDataSort
(
b
,
pOrderInfo
);
blockDataDestroy
(
b
);
taosArrayDestroy
(
pOrderInfo
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
9b6f008c
...
...
@@ -430,9 +430,10 @@ typedef struct STagScanInfo {
}
STagScanInfo
;
typedef
struct
SStreamBlockScanInfo
{
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
int32_t
blockType
;
// current block type
bool
blockValid
;
// Is current data has returned?
int32_t
validBlockIndex
;
// Is current data has returned?
SColumnInfo
*
pCols
;
// the output column info
uint64_t
numOfRows
;
// total scanned rows
uint64_t
numOfExec
;
// execution times
...
...
@@ -572,11 +573,13 @@ typedef struct SGroupbyOperatorInfo {
typedef
struct
SSessionAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SGroupResInfo
groupResInfo
;
STimeWindow
curWindow
;
// current time window
TSKEY
prevTs
;
// previous timestamp
int32_t
numOfRows
;
// number of rows
int32_t
start
;
// start row index
bool
reptScan
;
// next round scan
int64_t
gap
;
// session window gap
}
SSessionAggOperatorInfo
;
typedef
struct
SStateWindowOperatorInfo
{
...
...
@@ -593,8 +596,7 @@ typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo
binfo
;
bool
hasVarCol
;
SArray
*
orderInfo
;
// SArray<SBlockOrderInfo>
bool
nullFirst
;
SArray
*
pSortInfo
;
int32_t
numOfSources
;
SSortHandle
*
pSortHandle
;
...
...
@@ -613,12 +615,10 @@ typedef struct SSortedMergeOperatorInfo {
SAggSupporter
aggSup
;
}
SSortedMergeOperatorInfo
;
typedef
struct
S
Order
OperatorInfo
{
typedef
struct
S
Sort
OperatorInfo
{
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SSDataBlock
*
pDataBlock
;
bool
hasVarCol
;
// has variable length column, such as binary/varchar/nchar
SArray
*
orderInfo
;
bool
nullFirst
;
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
int32_t
bufPageSize
;
int32_t
numOfRowsInRes
;
...
...
@@ -629,7 +629,7 @@ typedef struct SOrderOperatorInfo {
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
uint64_t
totalElapsed
;
// total elapsed time
}
S
Order
OperatorInfo
;
}
S
Sort
OperatorInfo
;
typedef
struct
SDistinctDataInfo
{
int32_t
index
;
...
...
@@ -655,15 +655,15 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createMultiTableAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createOrderOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SArray
*
pOrderVal
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SArray
*
p
OrderVal
,
SArray
*
pGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResBlock
,
SArray
*
pSortInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SArray
*
p
SortInfo
,
SArray
*
pGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSysTableScanOperatorInfo
(
void
*
pSysTableReadHandle
,
SSDataBlock
*
pResBlock
,
const
SName
*
pName
,
SNode
*
pCondition
,
SEpSet
epset
,
SArray
*
colList
,
SExecTaskInfo
*
pTaskInfo
,
bool
showRewrite
,
int32_t
accountId
);
SOperatorInfo
*
createLimitOperatorInfo
(
SOperatorInfo
*
downstream
,
SLimit
*
pLimit
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
const
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
SSDataBlock
*
pResBlock
,
...
...
source/libs/executor/inc/tsort.h
浏览文件 @
9b6f008c
...
...
@@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @param type
* @return
*/
SSortHandle
*
tsortCreateSortHandle
(
SArray
*
pOrderInfo
,
bool
nullFirst
,
int32_t
type
,
int32_t
pageSize
,
int32_t
numOfPages
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
const
char
*
idstr
);
SSortHandle
*
tsortCreateSortHandle
(
SArray
*
pOrderInfo
,
int32_t
type
,
int32_t
pageSize
,
int32_t
numOfPages
,
SSDataBlock
*
pBlock
,
const
char
*
idstr
);
/**
*
...
...
source/libs/executor/src/executil.c
浏览文件 @
9b6f008c
...
...
@@ -19,8 +19,6 @@
#include "executil.h"
#include "executorimpl.h"
//#include "queryLog.h"
#include "tbuffer.h"
#include "tcompression.h"
#include "tlosertree.h"
...
...
source/libs/executor/src/executor.c
浏览文件 @
9b6f008c
...
...
@@ -14,11 +14,12 @@
*/
#include "executor.h"
#include "tdatablock.h"
#include "executorimpl.h"
#include "planner.h"
#include "vnode.h"
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
int32_t
type
,
char
*
id
)
{
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
char
*
id
)
{
ASSERT
(
pOperator
!=
NULL
);
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
...
...
@@ -31,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
return
TSDB_CODE_QRY_APP_ERROR
;
}
pOperator
->
status
=
OP_NOT_OPENED
;
return
doSetStreamBlock
(
pOperator
->
pDownstream
[
0
],
input
,
type
,
id
);
return
doSetStreamBlock
(
pOperator
->
pDownstream
[
0
],
input
,
numOfBlocks
,
type
,
id
);
}
else
{
SStreamBlockScanInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -43,20 +44,20 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
}
if
(
type
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
if
(
tqReadHandleSetMsg
(
pInfo
->
readerHandle
,
input
,
0
)
<
0
)
{
if
(
tqReadHandleSetMsg
(
pInfo
->
readerHandle
,
input
[
0
]
,
0
)
<
0
)
{
qError
(
"submit msg messed up when initing stream block, %s"
PRIx64
,
id
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
}
else
{
ASSERT
(
!
pInfo
->
blockValid
);
for
(
int32_t
i
=
0
;
i
<
numOfBlocks
;
++
i
)
{
SSDataBlock
*
pDataBlock
=
input
[
i
];
SSDataBlock
*
pDataBlock
=
input
;
pInfo
->
pRes
->
info
=
pDataBlock
->
info
;
taosArrayClear
(
pInfo
->
pRes
->
pDataBlock
);
taosArrayAddAll
(
pInfo
->
pRes
->
pDataBlock
,
pDataBlock
->
pDataBlock
);
SSDataBlock
*
p
=
createOneDataBlock
(
pDataBlock
);
p
->
info
=
pDataBlock
->
info
;
// set current block valid.
pInfo
->
blockValid
=
true
;
taosArrayAddAll
(
p
->
pDataBlock
,
pDataBlock
->
pDataBlock
);
taosArrayPush
(
pInfo
->
pBlockLists
,
&
p
);
}
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -64,17 +65,21 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
}
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
input
,
int32_t
type
)
{
qSetMultiStreamInput
(
tinfo
,
(
void
**
)
&
input
,
1
,
type
);
}
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
void
**
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
)
{
if
(
tinfo
==
NULL
)
{
return
TSDB_CODE_QRY_APP_ERROR
;
}
if
(
input
==
NULL
)
{
if
(
pBlocks
==
NULL
||
numOfBlocks
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
(
void
*
)
input
,
type
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
(
void
*
*
)
pBlocks
,
numOfBlocks
,
type
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s failed to set the stream block data"
,
GET_TASKID
(
pTaskInfo
));
}
else
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
9b6f008c
...
...
@@ -69,6 +69,7 @@ typedef enum SResultTsInterpType {
typedef
struct
SColMatchInfo
{
int32_t
colId
;
int32_t
targetSlotId
;
bool
output
;
}
SColMatchInfo
;
#if 0
...
...
@@ -322,26 +323,6 @@ static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, STaskRuntimeEn
}
//setup the output buffer for each operator
SSDataBlock
*
createOutputBuf
(
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
numOfRows
)
{
const
static
int32_t
minSize
=
8
;
SSDataBlock
*
res
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
res
->
info
.
numOfCols
=
numOfOutput
;
res
->
pDataBlock
=
taosArrayInit
(
numOfOutput
,
sizeof
(
SColumnInfoData
));
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SColumnInfoData
idata
=
{{
0
}};
idata
.
info
.
type
=
pExpr
[
i
].
base
.
resSchema
.
type
;
idata
.
info
.
bytes
=
pExpr
[
i
].
base
.
resSchema
.
bytes
;
idata
.
info
.
colId
=
pExpr
[
i
].
base
.
resSchema
.
colId
;
int32_t
size
=
TMAX
(
idata
.
info
.
bytes
*
numOfRows
,
minSize
);
idata
.
pData
=
taosMemoryCalloc
(
1
,
size
);
// at least to hold a pointer on x64 platform
taosArrayPush
(
res
->
pDataBlock
,
&
idata
);
}
return
res
;
}
SSDataBlock
*
createOutputBuf_rv1
(
SDataBlockDescNode
*
pNode
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pNode
->
pSlots
);
...
...
@@ -350,11 +331,15 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
pBlock
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
pBlock
->
info
.
blockId
=
pNode
->
dataBlockId
;
pBlock
->
info
.
rowSize
=
pNode
->
resultRowSize
;
pBlock
->
info
.
rowSize
=
pNode
->
resultRowSize
;
// todo ??
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
idata
=
{{
0
}};
SSlotDescNode
*
pDescNode
=
nodesListGetNode
(
pNode
->
pSlots
,
i
);
if
(
!
pDescNode
->
output
)
{
continue
;
}
idata
.
info
.
type
=
pDescNode
->
dataType
.
type
;
idata
.
info
.
bytes
=
pDescNode
->
dataType
.
bytes
;
idata
.
info
.
scale
=
pDescNode
->
dataType
.
scale
;
...
...
@@ -1813,25 +1798,27 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
}
}
static
void
doSessionWindowAggImpl
(
SOperatorInfo
*
pOperator
,
SSessionAggOperatorInfo
*
pInfo
,
SSDataBlock
*
pSDataBlock
)
{
STaskRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
S
TableQueryInfo
*
item
=
pRuntimeEnv
->
current
;
// todo handle multiple tables cases.
static
void
doSessionWindowAggImpl
(
SOperatorInfo
*
pOperator
,
SSessionAggOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
S
ExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
// primary timestamp column
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
p
SData
Block
->
pDataBlock
,
0
);
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
bool
masterScan
=
IS_MAIN_SCAN
(
pRuntimeEnv
);
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
bool
masterScan
=
true
;
STimeWindow
window
=
{
0
};
int32_t
numOfOutput
=
pOperator
->
numOfOutput
;
int64_t
gid
=
pBlock
->
info
.
groupId
;
int64_t
gap
=
p
Operator
->
pRuntimeEnv
->
pQueryAttr
->
sw
.
gap
;
int64_t
gap
=
p
Info
->
gap
;
pInfo
->
numOfRows
=
0
;
if
(
IS_REPEAT_SCAN
(
pRuntimeEnv
)
&&
!
pInfo
->
reptScan
)
{
if
(
/*IS_REPEAT_SCAN(pRuntimeEnv) && */
!
pInfo
->
reptScan
)
{
pInfo
->
reptScan
=
true
;
pInfo
->
prevTs
=
INT64_MIN
;
}
TSKEY
*
tsList
=
(
TSKEY
*
)
pColInfoData
->
pData
;
for
(
int32_t
j
=
0
;
j
<
p
SData
Block
->
info
.
rows
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pBlock
->
info
.
rows
;
++
j
)
{
if
(
pInfo
->
prevTs
==
INT64_MIN
)
{
pInfo
->
curWindow
.
skey
=
tsList
[
j
];
pInfo
->
curWindow
.
ekey
=
tsList
[
j
];
...
...
@@ -1848,17 +1835,15 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
}
}
else
{
// start a new session window
SResultRow
*
pResult
=
NULL
;
pInfo
->
curWindow
.
ekey
=
pInfo
->
curWindow
.
skey
;
int32_t
ret
=
setResultOutputBufByKey
(
pRuntimeEnv
,
&
pBInfo
->
resultRowInfo
,
pSDataBlock
->
info
.
uid
,
&
pInfo
->
curWindow
,
masterScan
,
&
pResult
,
item
->
groupIndex
,
pBInfo
->
pCtx
,
pOperator
->
numOfOutput
,
pBInfo
->
rowCellInfoOffset
);
int32_t
ret
=
setResultOutputBufByKey_rv
(
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
->
info
.
uid
,
&
window
,
masterScan
,
&
pResult
,
gid
,
pInfo
->
binfo
.
pCtx
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
longjmp
(
p
RuntimeEnv
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
longjmp
(
p
TaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
// pSDataBlock->info.rows, pOperator->numOfOutput
);
// pInfo->numOfRows data belong to the current session window
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
window
,
pInfo
->
start
,
pInfo
->
numOfRows
,
NULL
,
pBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
pInfo
->
curWindow
.
skey
=
tsList
[
j
];
pInfo
->
curWindow
.
ekey
=
tsList
[
j
];
...
...
@@ -1871,15 +1856,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
SResultRow
*
pResult
=
NULL
;
pInfo
->
curWindow
.
ekey
=
pInfo
->
curWindow
.
skey
;
int32_t
ret
=
setResultOutputBufByKey
(
pRuntimeEnv
,
&
pBInfo
->
resultRowInfo
,
pSDataBlock
->
info
.
uid
,
&
pInfo
->
curWindow
,
masterScan
,
&
pResult
,
item
->
groupIndex
,
pBInfo
->
pCtx
,
pOperator
->
numOfOutput
,
pBInfo
->
rowCellInfoOffset
);
int32_t
ret
=
setResultOutputBufByKey_rv
(
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
->
info
.
uid
,
&
window
,
masterScan
,
&
pResult
,
gid
,
pInfo
->
binfo
.
pCtx
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
longjmp
(
p
RuntimeEnv
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
longjmp
(
p
TaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
// pSDataBlock->info.rows, pOperator->numOfOutput);
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
window
,
pInfo
->
start
,
pInfo
->
numOfRows
,
NULL
,
pBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
}
static
void
setResultRowKey
(
SResultRow
*
pResultRow
,
char
*
pData
,
int16_t
type
)
{
...
...
@@ -2883,16 +2866,19 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo,
return
terrno
;
}
int32_t
numOfCols
=
pBlock
->
info
.
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pCols
,
i
);
SColMatchInfo
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
pColMatchInfo
,
i
);
ASSERT
(
pColMatchInfo
->
colId
==
p
->
info
.
colId
);
taosArraySet
(
pBlock
->
pDataBlock
,
pColMatchInfo
->
targetSlotId
,
p
);
int32_t
numOfCols
=
pBlock
->
info
.
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pCols
,
i
);
SColMatchInfo
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
pColMatchInfo
,
i
);
if
(
!
pColMatchInfo
->
output
)
{
continue
;
}
return
TSDB_CODE_SUCCESS
;
ASSERT
(
pColMatchInfo
->
colId
==
p
->
info
.
colId
);
taosArraySet
(
pBlock
->
pDataBlock
,
pColMatchInfo
->
targetSlotId
,
p
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
loadDataBlockOnDemand
(
SExecTaskInfo
*
pTaskInfo
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
...
...
@@ -4738,6 +4724,17 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) {
#endif
}
static
void
doClearBufferedBlocks
(
SStreamBlockScanInfo
*
pInfo
)
{
size_t
total
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
pInfo
->
validBlockIndex
=
0
;
for
(
int32_t
i
=
0
;
i
<
total
;
++
i
)
{
SSDataBlock
*
p
=
taosArrayGet
(
pInfo
->
pBlockLists
,
i
);
blockDataDestroy
(
p
);
}
taosArrayClear
(
pInfo
->
pBlockLists
);
}
static
SSDataBlock
*
doStreamBlockScan
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -4750,43 +4747,45 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup)
}
if
(
pInfo
->
blockType
==
STREAM_DATA_TYPE_SSDATA_BLOCK
)
{
if
(
pInfo
->
blockValid
)
{
pInfo
->
blockValid
=
false
;
// this block can only be used once.
return
pInfo
->
pRes
;
}
else
{
size_t
total
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
if
(
pInfo
->
validBlockIndex
>=
total
)
{
doClearBufferedBlocks
(
pInfo
);
return
NULL
;
}
}
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
blockDataCleanup
(
pInfo
->
pRes
);
int32_t
current
=
pInfo
->
validBlockIndex
++
;
return
taosArrayGet
(
pInfo
->
pBlockLists
,
current
);
}
else
{
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
blockDataCleanup
(
pInfo
->
pRes
);
while
(
tqNextDataBlock
(
pInfo
->
readerHandle
))
{
pTaskInfo
->
code
=
tqRetrieveDataBlockInfo
(
pInfo
->
readerHandle
,
pBlockInfo
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
pTaskInfo
->
code
;
return
NULL
;
}
while
(
tqNextDataBlock
(
pInfo
->
readerHandle
))
{
pTaskInfo
->
code
=
tqRetrieveDataBlockInfo
(
pInfo
->
readerHandle
,
pBlockInfo
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
pTaskInfo
->
code
;
return
NULL
;
}
if
(
pBlockInfo
->
rows
==
0
)
{
return
NULL
;
}
if
(
pBlockInfo
->
rows
==
0
)
{
return
NULL
;
}
pInfo
->
pRes
->
pDataBlock
=
tqRetrieveDataBlock
(
pInfo
->
readerHandle
);
if
(
pInfo
->
pRes
->
pDataBlock
==
NULL
)
{
// TODO add log
pTaskInfo
->
code
=
terrno
;
return
NULL
;
}
pInfo
->
pRes
->
pDataBlock
=
tqRetrieveDataBlock
(
pInfo
->
readerHandle
);
if
(
pInfo
->
pRes
->
pDataBlock
==
NULL
)
{
// TODO add log
pTaskInfo
->
code
=
terrno
;
return
NULL
;
}
break
;
}
break
;
}
// record the scan action.
pInfo
->
numOfExec
++
;
pInfo
->
numOfRows
+=
pBlockInfo
->
rows
;
// record the scan action.
pInfo
->
numOfExec
++
;
pInfo
->
numOfRows
+=
pBlockInfo
->
rows
;
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
}
int32_t
loadRemoteDataCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
...
...
@@ -5423,6 +5422,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
return
NULL
;
}
pInfo
->
pBlockLists
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
pInfo
->
pBlockLists
==
NULL
)
{
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
pInfo
->
readerHandle
=
streamReadHandle
;
pInfo
->
pRes
=
pResBlock
;
...
...
@@ -5840,14 +5846,14 @@ static void cleanupAggSup(SAggSupporter* pAggSup);
static
void
destroySortedMergeOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SSortedMergeOperatorInfo
*
pInfo
=
(
SSortedMergeOperatorInfo
*
)
param
;
taosArrayDestroy
(
pInfo
->
order
Info
);
taosArrayDestroy
(
pInfo
->
pSort
Info
);
taosArrayDestroy
(
pInfo
->
groupInfo
);
if
(
pInfo
->
pSortHandle
!=
NULL
)
{
tsortDestroySortHandle
(
pInfo
->
pSortHandle
);
}
blockDataDestroy
(
pInfo
->
binfo
.
pRes
);
blockDataDestroy
(
pInfo
->
binfo
.
pRes
);
cleanupAggSup
(
&
pInfo
->
aggSup
);
}
...
...
@@ -6105,12 +6111,10 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) {
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pInfo
->
hasVarCol
,
pInfo
->
binfo
.
capacity
);
}
SSchema
*
p
=
blockDataExtractSchema
(
pInfo
->
binfo
.
pRes
,
NULL
);
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
orderInfo
,
pInfo
->
nullFirst
,
SORT_MULTISOURCE_MERGE
,
pInfo
->
bufPageSize
,
numOfBufPage
,
p
,
pInfo
->
binfo
.
pRes
->
info
.
numOfCol
s
,
"GET_TASKID(pTaskInfo)"
);
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
SORT_MULTISOURCE_MERGE
,
pInfo
->
bufPageSize
,
numOfBufPage
,
p
Info
->
binfo
.
pRe
s
,
"GET_TASKID(pTaskInfo)"
);
taosMemoryFreeClear
(
p
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
loadNextDataBlock
);
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfDownstream
;
++
i
)
{
...
...
@@ -6128,29 +6132,6 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) {
return
doMerge
(
pOperator
);
}
static
SArray
*
createBlockOrder
(
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SArray
*
pOrderVal
)
{
SArray
*
pOrderInfo
=
taosArrayInit
(
1
,
sizeof
(
SBlockOrderInfo
));
size_t
numOfOrder
=
taosArrayGetSize
(
pOrderVal
);
for
(
int32_t
j
=
0
;
j
<
numOfOrder
;
++
j
)
{
SBlockOrderInfo
orderInfo
=
{
0
};
SOrder
*
pOrder
=
taosArrayGet
(
pOrderVal
,
j
);
orderInfo
.
order
=
pOrder
->
order
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SExprInfo
*
pExpr
=
&
pExprInfo
[
i
];
if
(
pExpr
->
base
.
resSchema
.
colId
==
pOrder
->
col
.
colId
)
{
orderInfo
.
colIndex
=
i
;
break
;
}
}
taosArrayPush
(
pOrderInfo
,
&
orderInfo
);
}
return
pOrderInfo
;
}
static
int32_t
initGroupCol
(
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SArray
*
pGroupInfo
,
SSortedMergeOperatorInfo
*
pInfo
)
{
if
(
pGroupInfo
==
NULL
||
taosArrayGetSize
(
pGroupInfo
)
==
0
)
{
return
0
;
...
...
@@ -6199,7 +6180,7 @@ static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGr
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SArray
*
p
OrderVal
,
SArray
*
pGroupInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SArray
*
p
SortInfo
,
SArray
*
pGroupInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
SSortedMergeOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortedMergeOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -6228,7 +6209,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
// pRuntimeEnv->pQueryAttr->topBotQuery, false));
pInfo
->
sortBufSize
=
1024
*
16
;
// 1MB
pInfo
->
bufPageSize
=
1024
;
pInfo
->
orderInfo
=
createBlockOrder
(
pExprInfo
,
num
,
pOrderVal
)
;
pInfo
->
pSortInfo
=
pSortInfo
;
pInfo
->
binfo
.
capacity
=
blockDataGetCapacityInRow
(
pInfo
->
binfo
.
pRes
,
pInfo
->
bufPageSize
);
...
...
@@ -6268,35 +6249,34 @@ static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) {
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SOrderOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSortOperatorInfo
*
pInfo
=
pOperator
->
info
;
bool
hasVarCol
=
pInfo
->
pDataBlock
->
info
.
hasVarCol
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
pInfo
->
hasVarCol
,
pInfo
->
numOfRowsInRes
);
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
hasVarCol
,
pInfo
->
numOfRowsInRes
);
}
SSchema
*
p
=
blockDataExtractSchema
(
pInfo
->
pDataBlock
,
NULL
);
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
orderInfo
,
pInfo
->
nullFirst
,
SORT_SINGLESOURCE_SORT
,
pInfo
->
bufPageSize
,
numOfBufPage
,
p
,
pInfo
->
pDataBlock
->
info
.
numOfCols
,
"GET_TASKID(pTaskInfo)"
);
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
SORT_SINGLESOURCE_SORT
,
pInfo
->
bufPageSize
,
numOfBufPage
,
p
Info
->
pDataBlock
,
"GET_TASKID(pTaskInfo)"
);
taosMemoryFreeClear
(
p
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
loadNextDataBlock
);
SGenericSource
*
ps
=
taosMemoryCalloc
(
1
,
sizeof
(
SGenericSource
));
ps
->
param
=
pOperator
;
ps
->
param
=
pOperator
->
pDownstream
[
0
]
;
tsortAddSource
(
pInfo
->
pSortHandle
,
ps
);
// TODO set error code;
int32_t
code
=
tsortOpen
(
pInfo
->
pSortHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
terrno
);
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
pInfo
->
hasVarCol
,
pInfo
->
numOfRowsInRes
);
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
hasVarCol
,
pInfo
->
numOfRowsInRes
);
}
SOperatorInfo
*
create
OrderOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SArray
*
pOrderVal
,
SExecTaskInfo
*
pTaskInfo
)
{
S
OrderOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SOrder
OperatorInfo
));
SOperatorInfo
*
create
SortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResBlock
,
SArray
*
pSortInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
S
SortOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSort
OperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
taosMemoryFreeClear
(
pInfo
);
...
...
@@ -6305,37 +6285,21 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return
NULL
;
}
pInfo
->
sortBufSize
=
1024
*
16
;
// 1MB
pInfo
->
bufPageSize
=
1024
;
pInfo
->
numOfRowsInRes
=
1024
;
pInfo
->
orderInfo
=
createBlockOrder
(
pExprInfo
,
numOfCols
,
pOrderVal
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
if
(
IS_VAR_DATA_TYPE
(
pExprInfo
[
i
].
base
.
resSchema
.
type
))
{
pInfo
->
hasVarCol
=
true
;
break
;
}
}
if
(
pInfo
->
orderInfo
==
NULL
||
pInfo
->
pDataBlock
==
NULL
)
{
taosMemoryFreeClear
(
pOperator
);
destroyOrderOperatorInfo
(
pInfo
,
numOfCols
);
taosMemoryFreeClear
(
pInfo
);
pInfo
->
sortBufSize
=
1024
*
16
;
// 1MB, TODO dynamic set the available sort buffer
pInfo
->
bufPageSize
=
1024
;
pInfo
->
numOfRowsInRes
=
1024
;
pInfo
->
pDataBlock
=
pResBlock
;
pInfo
->
pSortInfo
=
pSortInfo
;
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
NULL
;
}
pOperator
->
name
=
"Order"
;
pOperator
->
name
=
"Sort"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SORT
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
getNextFn
=
doSort
;
pOperator
->
closeFn
=
destroyOrderOperatorInfo
;
pOperator
->
getNextFn
=
doSort
;
pOperator
->
closeFn
=
destroyOrderOperatorInfo
;
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
...
...
@@ -6496,7 +6460,6 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t l
offset
+=
valueLen
;
initResultRow
(
resultRow
);
pInfo
->
resultRowInfo
.
pPosition
[
pInfo
->
resultRowInfo
.
size
++
]
=
(
SResultRowPosition
)
{.
pageId
=
resultRow
->
pageId
,
.
offset
=
resultRow
->
offset
};
}
...
...
@@ -6741,7 +6704,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
STableIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
// int32_t order = pQueryAttr->order.order
;
int32_t
order
=
TSDB_ORDER_ASC
;
// STimeWindow win = pQueryAttr->window;
bool
newgroup
=
false
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
...
...
@@ -6758,7 +6721,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
TSDB_ORDER_ASC
);
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
0
);
}
...
...
@@ -6770,7 +6733,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
return
TSDB_CODE_SUCCESS
;
}
static
SSDataBlock
*
do
IntervalAgg
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
SSDataBlock
*
do
BuildIntervalResult
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
STableIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -7102,13 +7065,14 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup)
return
NULL
;
}
SSessionAggOperatorInfo
*
p
Window
Info
=
pOperator
->
info
;
SOptrBasicInfo
*
pBInfo
=
&
p
Window
Info
->
binfo
;
SSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
toSDatablock
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
,
pBInfo
->
pRes
,
pBInfo
->
capacity
,
pBInfo
->
rowCellInfoOffset
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
}
return
pBInfo
->
pRes
;
...
...
@@ -7127,19 +7091,20 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup)
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
order
);
doSessionWindowAggImpl
(
pOperator
,
p
Window
Info
,
pBlock
);
doSessionWindowAggImpl
(
pOperator
,
pInfo
,
pBlock
);
}
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
finalizeQueryResult
(
pBInfo
->
pCtx
,
pOperator
->
numOfOutput
);
finalizeMultiTupleQueryResult
(
pBInfo
->
pCtx
,
pOperator
->
numOfOutput
,
pInfo
->
aggSup
.
pResultBuf
,
&
pBInfo
->
resultRowInfo
,
pBInfo
->
rowCellInfoOffset
);
// initGroupResInfo(&pBInfo->groupResInfo, &pBInfo->resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
initGroupResInfo
(
&
pInfo
->
groupResInfo
,
&
pBInfo
->
resultRowInfo
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pBInfo
->
capacity
);
toSDatablock
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
,
pBInfo
->
pRes
,
pBInfo
->
capacity
,
pBInfo
->
rowCellInfoOffset
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
return
pBInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pBInfo
->
pRes
;
...
...
@@ -7505,10 +7470,10 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
}
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
S
OrderOperatorInfo
*
pInfo
=
(
SOrder
OperatorInfo
*
)
param
;
S
SortOperatorInfo
*
pInfo
=
(
SSort
OperatorInfo
*
)
param
;
pInfo
->
pDataBlock
=
blockDataDestroy
(
pInfo
->
pDataBlock
);
taosArrayDestroy
(
pInfo
->
order
Info
);
taosArrayDestroy
(
pInfo
->
pSort
Info
);
}
static
void
destroyConditionOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
...
...
@@ -7716,7 +7681,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator
->
numOfOutput
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
_openFn
=
doOpenIntervalAgg
;
pOperator
->
getNextFn
=
do
IntervalAgg
;
pOperator
->
getNextFn
=
do
BuildIntervalResult
;
pOperator
->
closeFn
=
destroyIntervalOperatorInfo
;
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
@@ -7782,25 +7747,27 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
return
pOperator
;
}
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
SExecTaskInfo
*
pTaskInfo
)
{
SSessionAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSessionAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
int32_t
code
=
doInitAggInfoSup
(
&
pInfo
->
aggSup
,
pInfo
->
binfo
.
pCtx
,
numOfCols
,
pTaskInfo
->
id
.
str
);
int32_t
numOfRows
=
4096
;
int32_t
code
=
initAggInfo
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
numOfRows
,
pResBlock
,
pTaskInfo
->
id
.
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
);
pInfo
->
gap
=
gap
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
prevTs
=
INT64_MIN
;
pInfo
->
reptScan
=
false
;
pOperator
->
name
=
"SessionWindowAggOperator"
;
// pOperator->operatorType = OP_SessionWindow
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
...
...
@@ -8035,7 +8002,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
offset
+=
pExpr
[
index
->
colIndex
].
base
.
resSchema
.
bytes
;
}
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pOperator
->
resultInfo
.
capacity
);
//
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity);
pOperator
->
name
=
"SLimitOperator"
;
...
...
@@ -8328,7 +8295,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato
pInfo
->
outputCapacity
=
4096
;
pInfo
->
pDistinctDataInfo
=
taosArrayInit
(
numOfOutput
,
sizeof
(
SDistinctDataInfo
));
pInfo
->
pSet
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
(
int32_t
)
pInfo
->
outputCapacity
);
//
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -8535,16 +8502,17 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
static
SArray
*
extractTableIdList
(
const
STableGroupInfo
*
pTableGroupInfo
);
static
SArray
*
extractScanColumnId
(
SNodeList
*
pNodeList
);
static
SArray
*
extractColumnInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
);
static
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
SOperatorInfo
*
doCreateOperatorTreeNode
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
queryId
,
uint64_t
taskId
,
STableGroupInfo
*
pTableGroupInfo
)
{
if
(
pPhyNode
->
pChildren
==
NULL
||
LIST_LENGTH
(
pPhyNode
->
pChildren
)
==
0
)
{
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
==
nodeType
(
pPhyNode
))
{
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
size_t
numOfCols
=
LIST_LENGTH
(
pScanPhyNode
->
pScanCols
)
;
int32_t
numOfCols
=
0
;
tsdbReaderT
pDataReader
=
doCreateDataReader
((
STableScanPhysiNode
*
)
pPhyNode
,
pHandle
,
pTableGroupInfo
,
(
uint64_t
)
queryId
,
taskId
);
SArray
*
pColList
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
);
SArray
*
pColList
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pScanPhyNode
->
node
.
pOutputDataBlockDesc
,
&
numOfCols
);
return
createTableScanOperatorInfo
(
pDataReader
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pScanPhyNode
->
reverse
,
pColList
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
nodeType
(
pPhyNode
))
{
...
...
@@ -8583,15 +8551,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
size_t
size
=
LIST_LENGTH
(
pPhyNode
->
pChildren
);
assert
(
size
==
1
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
i
);
SOperatorInfo
*
op
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableGroupInfo
);
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
0
);
SOperatorInfo
*
op
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableGroupInfo
);
int32_t
num
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(((
SProjectPhysiNode
*
)
pPhyNode
)
->
pProjections
,
NULL
,
&
num
);
SSDataBlock
*
pResBlock
=
createOutputBuf_rv1
(
pPhyNode
->
pOutputDataBlockDesc
);
return
createProjectOperatorInfo
(
op
,
pExprInfo
,
num
,
pResBlock
,
pTaskInfo
);
}
int32_t
num
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(((
SProjectPhysiNode
*
)
pPhyNode
)
->
pProjections
,
NULL
,
&
num
);
SSDataBlock
*
pResBlock
=
createOutputBuf_rv1
(
pPhyNode
->
pOutputDataBlockDesc
);
return
createProjectOperatorInfo
(
op
,
pExprInfo
,
num
,
pResBlock
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_AGG
==
nodeType
(
pPhyNode
))
{
size_t
size
=
LIST_LENGTH
(
pPhyNode
->
pChildren
);
assert
(
size
==
1
);
...
...
@@ -8623,16 +8589,46 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SIntervalPhysiNode
*
pIntervalPhyNode
=
(
SIntervalPhysiNode
*
)
pPhyNode
;
//todo: set the correct primary timestamp key column
int32_t
num
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
num
);
SSDataBlock
*
pResBlock
=
createOutputBuf_rv1
(
pPhyNode
->
pOutputDataBlockDesc
);
SInterval
interval
=
{.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
SInterval
interval
=
{.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
.
intervalUnit
=
pIntervalPhyNode
->
intervalUnit
,
.
slidingUnit
=
pIntervalPhyNode
->
slidingUnit
,
.
offset
=
pIntervalPhyNode
->
offset
};
.
slidingUnit
=
pIntervalPhyNode
->
slidingUnit
,
.
offset
=
pIntervalPhyNode
->
offset
};
return
createIntervalOperatorInfo
(
op
,
pExprInfo
,
num
,
pResBlock
,
&
interval
,
pTableGroupInfo
,
pTaskInfo
);
}
}
/*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SORT
==
nodeType
(
pPhyNode
))
{
size_t
size
=
LIST_LENGTH
(
pPhyNode
->
pChildren
);
assert
(
size
==
1
);
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
0
);
SOperatorInfo
*
op
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableGroupInfo
);
SSortPhysiNode
*
pSortPhyNode
=
(
SSortPhysiNode
*
)
pPhyNode
;
SSDataBlock
*
pResBlock
=
createOutputBuf_rv1
(
pPhyNode
->
pOutputDataBlockDesc
);
SArray
*
info
=
createSortInfo
(
pSortPhyNode
->
pSortKeys
);
return
createSortOperatorInfo
(
op
,
pResBlock
,
info
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
==
nodeType
(
pPhyNode
))
{
size_t
size
=
LIST_LENGTH
(
pPhyNode
->
pChildren
);
assert
(
size
==
1
);
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
0
);
SOperatorInfo
*
op
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableGroupInfo
);
SSessionWinodwPhysiNode
*
pSessionNode
=
(
SSessionWinodwPhysiNode
*
)
pPhyNode
;
int32_t
num
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSessionNode
->
window
.
pFuncs
,
NULL
,
&
num
);
SSDataBlock
*
pResBlock
=
createOutputBuf_rv1
(
pPhyNode
->
pOutputDataBlockDesc
);
return
createSessionAggOperatorInfo
(
op
,
pExprInfo
,
num
,
pResBlock
,
pSessionNode
->
gap
,
pTaskInfo
);
}
else
{
ASSERT
(
0
);
}
/*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren);
assert(size == 1);
...
...
@@ -8725,7 +8721,39 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return
pList
;
}
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
)
{
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
)
{
size_t
numOfCols
=
LIST_LENGTH
(
pNodeList
);
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SBlockOrderInfo
));
if
(
pList
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
pList
;
}
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
STargetNode
*
pNode
=
(
STargetNode
*
)
nodesListGetNode
(
pNodeList
,
i
);
SOrderByExprNode
*
pSortKey
=
(
SOrderByExprNode
*
)
pNode
->
pExpr
;
SBlockOrderInfo
bi
=
{
0
};
bi
.
order
=
(
pSortKey
->
order
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
bi
.
nullFirst
=
(
pSortKey
->
nullOrder
==
NULL_ORDER_FIRST
);
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pSortKey
->
pExpr
;
bi
.
slotId
=
pColNode
->
slotId
;
// pColNode->order;
// SColumn c = {0};
// c.slotId = pColNode->slotId;
// c.colId = pColNode->colId;
// c.type = pColNode->node.resType.type;
// c.bytes = pColNode->node.resType.bytes;
// c.precision = pColNode->node.resType.precision;
// c.scale = pColNode->node.resType.scale;
taosArrayPush
(
pList
,
&
bi
);
}
return
pList
;
}
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
)
{
size_t
numOfCols
=
LIST_LENGTH
(
pNodeList
);
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColMatchInfo
));
if
(
pList
==
NULL
)
{
...
...
@@ -8738,12 +8766,25 @@ SArray* extractColMatchInfo(SNodeList* pNodeList) {
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
->
pExpr
;
SColMatchInfo
c
=
{
0
};
c
.
colId
=
pColNode
->
colId
;
c
.
colId
=
pColNode
->
colId
;
c
.
targetSlotId
=
pNode
->
slotId
;
c
.
output
=
true
;
taosArrayPush
(
pList
,
&
c
);
}
*
numOfOutputCols
=
0
;
int32_t
num
=
LIST_LENGTH
(
pOutputNodeList
->
pSlots
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SSlotDescNode
*
pNode
=
(
SSlotDescNode
*
)
nodesListGetNode
(
pOutputNodeList
->
pSlots
,
i
);
SColMatchInfo
*
info
=
taosArrayGet
(
pList
,
pNode
->
slotId
);
// if (pNode->output) {
(
*
numOfOutputCols
)
+=
1
;
// } else {
// info->output = false;
// }
}
return
pList
;
}
...
...
source/libs/executor/src/tsort.c
浏览文件 @
9b6f008c
...
...
@@ -23,20 +23,19 @@
#include "tsort.h"
#include "tutil.h"
typedef
struct
STupleHandle
{
struct
STupleHandle
{
SSDataBlock
*
pBlock
;
int32_t
rowIndex
;
}
STupleHandle
;
};
typedef
struct
SSortHandle
{
struct
SSortHandle
{
int32_t
type
;
int32_t
pageSize
;
int32_t
numOfPages
;
SDiskbasedBuf
*
pBuf
;
SArray
*
pOrderInfo
;
bool
nullFirst
;
SArray
*
pSortInfo
;
SArray
*
pOrderedSource
;
_sort_fetch_block_fn_t
fetchfp
;
...
...
@@ -60,7 +59,7 @@ typedef struct SSortHandle {
bool
inMemSort
;
bool
needAdjust
;
STupleHandle
tupleHandle
;
}
SSortHandle
;
};
static
int32_t
msortComparFn
(
const
void
*
pLeft
,
const
void
*
pRight
,
void
*
param
);
...
...
@@ -90,18 +89,18 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
* @param type
* @return
*/
SSortHandle
*
tsortCreateSortHandle
(
SArray
*
p
OrderInfo
,
bool
nullFirst
,
int32_t
type
,
int32_t
pageSize
,
int32_t
numOfPages
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
const
char
*
idstr
)
{
SSortHandle
*
tsortCreateSortHandle
(
SArray
*
p
SortInfo
,
int32_t
type
,
int32_t
pageSize
,
int32_t
numOfPages
,
SSDataBlock
*
pBlock
,
const
char
*
idstr
)
{
SSortHandle
*
pSortHandle
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortHandle
));
pSortHandle
->
type
=
type
;
pSortHandle
->
pageSize
=
pageSize
;
pSortHandle
->
numOfPages
=
numOfPages
;
pSortHandle
->
pOrderedSource
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pSortHandle
->
pOrderInfo
=
pOrderInfo
;
pSortHandle
->
nullFirst
=
nullFirst
;
pSortHandle
->
cmpParam
.
orderInfo
=
pOrderInfo
;
pSortHandle
->
pSortInfo
=
pSortInfo
;
pSortHandle
->
pDataBlock
=
createOneDataBlock
(
pBlock
);
pSortHandle
->
pOrderedSource
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pSortHandle
->
cmpParam
.
orderInfo
=
pSortInfo
;
pSortHandle
->
pDataBlock
=
createDataBlock_rv
(
pSchema
,
numOfCols
);
tsortSetComparFp
(
pSortHandle
,
msortComparFn
);
if
(
idstr
!=
NULL
)
{
...
...
@@ -364,14 +363,14 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
for
(
int32_t
i
=
0
;
i
<
pInfo
->
size
;
++
i
)
{
SBlockOrderInfo
*
pOrder
=
TARRAY_GET_ELEM
(
pInfo
,
i
);
SColumnInfoData
*
pLeftColInfoData
=
TARRAY_GET_ELEM
(
pLeftBlock
->
pDataBlock
,
pOrder
->
colIndex
);
SColumnInfoData
*
pLeftColInfoData
=
TARRAY_GET_ELEM
(
pLeftBlock
->
pDataBlock
,
pOrder
->
slotId
);
bool
leftNull
=
false
;
if
(
pLeftColInfoData
->
hasNull
)
{
leftNull
=
colDataIsNull
(
pLeftColInfoData
,
pLeftBlock
->
info
.
rows
,
pLeftSource
->
src
.
rowIndex
,
pLeftBlock
->
pBlockAgg
);
}
SColumnInfoData
*
pRightColInfoData
=
TARRAY_GET_ELEM
(
pRightBlock
->
pDataBlock
,
pOrder
->
colIndex
);
SColumnInfoData
*
pRightColInfoData
=
TARRAY_GET_ELEM
(
pRightBlock
->
pDataBlock
,
pOrder
->
slotId
);
bool
rightNull
=
false
;
if
(
pRightColInfoData
->
hasNull
)
{
rightNull
=
colDataIsNull
(
pRightColInfoData
,
pRightBlock
->
info
.
rows
,
pRightSource
->
src
.
rowIndex
,
pRightBlock
->
pBlockAgg
);
...
...
@@ -415,6 +414,9 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
static
int32_t
doInternalMergeSort
(
SSortHandle
*
pHandle
)
{
size_t
numOfSources
=
taosArrayGetSize
(
pHandle
->
pOrderedSource
);
if
(
numOfSources
==
0
)
{
return
0
;
}
// Calculate the I/O counts to complete the data sort.
double
sortPass
=
floorl
(
log2
(
numOfSources
)
/
log2
(
pHandle
->
numOfPages
));
...
...
@@ -542,7 +544,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
if
(
size
>
sortBufSize
)
{
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t
p
=
taosGetTimestampUs
();
blockDataSort
(
pHandle
->
pDataBlock
,
pHandle
->
p
OrderInfo
,
pHandle
->
nullFirst
);
blockDataSort
(
pHandle
->
pDataBlock
,
pHandle
->
p
SortInfo
);
int64_t
el
=
taosGetTimestampUs
()
-
p
;
pHandle
->
sortElapsed
+=
el
;
...
...
@@ -555,7 +557,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
size_t
size
=
blockDataGetSize
(
pHandle
->
pDataBlock
);
// Perform the in-memory sort and then flush data in the buffer into disk.
blockDataSort
(
pHandle
->
pDataBlock
,
pHandle
->
p
OrderInfo
,
pHandle
->
nullFirst
);
blockDataSort
(
pHandle
->
pDataBlock
,
pHandle
->
p
SortInfo
);
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
if
(
size
<=
sortBufSize
)
{
...
...
@@ -603,6 +605,10 @@ int32_t tsortOpen(SSortHandle* pHandle) {
ASSERT
(
numOfSources
<=
getNumOfInMemBufPages
(
pHandle
->
pBuf
));
}
if
(
numOfSources
==
0
)
{
return
0
;
}
code
=
sortComparInit
(
&
pHandle
->
cmpParam
,
pHandle
->
pOrderedSource
,
0
,
numOfSources
-
1
,
pHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
source/libs/executor/test/executorTests.cpp
浏览文件 @
9b6f008c
...
...
@@ -966,7 +966,7 @@ TEST(testCase, inMem_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = create
Order
OperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL);
SOperatorInfo* pOperator = create
Sort
OperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false;
SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup);
...
...
@@ -1035,7 +1035,7 @@ TEST(testCase, external_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = create
Order
OperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL);
SOperatorInfo* pOperator = create
Sort
OperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false;
SSDataBlock* pRes = NULL;
...
...
source/libs/executor/test/sortTests.cpp
浏览文件 @
9b6f008c
...
...
@@ -98,14 +98,14 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
for
(
int32_t
i
=
0
;
i
<
pInfo
->
size
;
++
i
)
{
SBlockOrderInfo
*
pOrder
=
(
SBlockOrderInfo
*
)
TARRAY_GET_ELEM
(
pInfo
,
i
);
SColumnInfoData
*
pLeftColInfoData
=
(
SColumnInfoData
*
)
TARRAY_GET_ELEM
(
pLeftBlock
->
pDataBlock
,
pOrder
->
colIndex
);
SColumnInfoData
*
pLeftColInfoData
=
(
SColumnInfoData
*
)
TARRAY_GET_ELEM
(
pLeftBlock
->
pDataBlock
,
pOrder
->
slotId
);
bool
leftNull
=
false
;
if
(
pLeftColInfoData
->
hasNull
)
{
leftNull
=
colDataIsNull
(
pLeftColInfoData
,
pLeftBlock
->
info
.
rows
,
pLeftSource
->
src
.
rowIndex
,
pLeftBlock
->
pBlockAgg
);
}
SColumnInfoData
*
pRightColInfoData
=
(
SColumnInfoData
*
)
TARRAY_GET_ELEM
(
pRightBlock
->
pDataBlock
,
pOrder
->
colIndex
);
SColumnInfoData
*
pRightColInfoData
=
(
SColumnInfoData
*
)
TARRAY_GET_ELEM
(
pRightBlock
->
pDataBlock
,
pOrder
->
slotId
);
bool
rightNull
=
false
;
if
(
pRightColInfoData
->
hasNull
)
{
rightNull
=
colDataIsNull
(
pRightColInfoData
,
pRightBlock
->
info
.
rows
,
pRightSource
->
src
.
rowIndex
,
pRightBlock
->
pBlockAgg
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录