Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1ed388c0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1ed388c0
编写于
7月 18, 2022
作者:
H
Haojun Liao
提交者:
GitHub
7月 18, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15016 from taosdata/feature/3_liaohj
fix(query): add limit/offset implementation in exchange operator.
上级
1ea48b62
95db2fb8
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
139 addition
and
79 deletion
+139
-79
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+3
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+16
-10
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+19
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+79
-52
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+14
-8
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+7
-7
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+1
-1
未找到文件。
source/common/src/tdatablock.c
浏览文件 @
1ed388c0
...
...
@@ -1479,10 +1479,12 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s
}
beigin
++
;
}
if
(
dataOffset
>
0
)
{
memmove
(
pColInfoData
->
pData
,
pColInfoData
->
pData
+
dataOffset
,
dataLen
);
memmove
(
pColInfoData
->
varmeta
.
offset
,
&
pColInfoData
->
varmeta
.
offset
[
start
],
(
end
-
start
)
*
sizeof
(
int32_t
));
}
memmove
(
pColInfoData
->
varmeta
.
offset
,
&
pColInfoData
->
varmeta
.
offset
[
start
],
(
end
-
start
)
*
sizeof
(
int32_t
));
return
dataLen
;
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
1ed388c0
...
...
@@ -247,6 +247,16 @@ typedef struct SLoadRemoteDataInfo {
uint64_t
totalElapsed
;
// total elapsed time
}
SLoadRemoteDataInfo
;
typedef
struct
SLimitInfo
{
SLimit
limit
;
SLimit
slimit
;
uint64_t
currentGroupId
;
int64_t
remainGroupOffset
;
int64_t
numOfOutputGroups
;
int64_t
remainOffset
;
int64_t
numOfOutputRows
;
}
SLimitInfo
;
typedef
struct
SExchangeInfo
{
SArray
*
pSources
;
SArray
*
pSourceDataInfo
;
...
...
@@ -257,6 +267,7 @@ typedef struct SExchangeInfo {
int32_t
current
;
SLoadRemoteDataInfo
loadInfo
;
uint64_t
self
;
SLimitInfo
limitInfo
;
}
SExchangeInfo
;
typedef
struct
SColMatchInfo
{
...
...
@@ -542,15 +553,7 @@ typedef struct SProjectOperatorInfo {
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
SSDataBlock
*
existDataBlock
;
SArray
*
pPseudoColInfo
;
SLimit
limit
;
SLimit
slimit
;
uint64_t
groupId
;
int64_t
curSOffset
;
int64_t
curGroupOutput
;
int64_t
curOffset
;
int64_t
curOutput
;
SLimitInfo
limitInfo
;
}
SProjectOperatorInfo
;
typedef
struct
SIndefOperatorInfo
{
...
...
@@ -791,6 +794,9 @@ int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
const
char
*
pkey
);
void
initResultSizeInfo
(
SOperatorInfo
*
pOperator
,
int32_t
numOfRows
);
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
);
bool
hasLimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
void
initLimitInfo
(
const
SNode
*
pLimit
,
const
SNode
*
pSLimit
,
SLimitInfo
*
pLimitInfo
);
void
doApplyFunctions
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
);
...
...
@@ -837,7 +843,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo
*
createIndefinitOutputOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SProjectPhysiNode
*
pProjPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSortPhysiNode
*
pSort
Phy
Node
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSortPhysiNode
*
pSortNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMultiwayMergeOperatorInfo
(
SOperatorInfo
**
dowStreams
,
size_t
numStreams
,
SMergePhysiNode
*
pMergePhysiNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SArray
*
pSortInfo
,
SArray
*
pGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createLastrowScanOperator
(
SLastRowScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
1ed388c0
...
...
@@ -898,4 +898,23 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
}
return
w
;
}
bool
hasLimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
)
{
return
(
pLimitInfo
->
limit
.
limit
!=
-
1
||
pLimitInfo
->
limit
.
offset
!=
-
1
||
pLimitInfo
->
slimit
.
limit
!=
-
1
||
pLimitInfo
->
slimit
.
offset
!=
-
1
);
}
static
int64_t
getLimit
(
const
SNode
*
pLimit
)
{
return
NULL
==
pLimit
?
-
1
:
((
SLimitNode
*
)
pLimit
)
->
limit
;
}
static
int64_t
getOffset
(
const
SNode
*
pLimit
)
{
return
NULL
==
pLimit
?
-
1
:
((
SLimitNode
*
)
pLimit
)
->
offset
;
}
void
initLimitInfo
(
const
SNode
*
pLimit
,
const
SNode
*
pSLimit
,
SLimitInfo
*
pLimitInfo
)
{
SLimit
limit
=
{.
limit
=
getLimit
(
pLimit
),
.
offset
=
getOffset
(
pLimit
)};
SLimit
slimit
=
{.
limit
=
getLimit
(
pSLimit
),
.
offset
=
getOffset
(
pSLimit
)};
pLimitInfo
->
limit
=
limit
;
pLimitInfo
->
slimit
=
slimit
;
pLimitInfo
->
remainOffset
=
limit
.
offset
;
pLimitInfo
->
remainGroupOffset
=
slimit
.
offset
;
}
\ No newline at end of file
source/libs/executor/src/executorimpl.c
浏览文件 @
1ed388c0
...
...
@@ -43,6 +43,11 @@
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
enum
{
PROJECT_RETRIEVE_CONTINUE
=
0x1
,
PROJECT_RETRIEVE_DONE
=
0x2
,
};
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = taosRand();
...
...
@@ -2343,7 +2348,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
return
TSDB_CODE_SUCCESS
;
}
static
SSDataBlock
*
doLoadRemoteData
(
SOperatorInfo
*
pOperator
)
{
static
SSDataBlock
*
doLoadRemoteData
Impl
(
SOperatorInfo
*
pOperator
)
{
SExchangeInfo
*
pExchangeInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -2369,6 +2374,44 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
}
}
static
SSDataBlock
*
doLoadRemoteData
(
SOperatorInfo
*
pOperator
)
{
SExchangeInfo
*
pExchangeInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
while
(
1
)
{
SSDataBlock
*
pBlock
=
doLoadRemoteDataImpl
(
pOperator
);
if
(
pBlock
==
NULL
)
{
return
NULL
;
}
ASSERT
(
pBlock
==
pExchangeInfo
->
pResult
);
SLimitInfo
*
pLimitInfo
=
&
pExchangeInfo
->
limitInfo
;
if
(
hasLimitOffsetInfo
(
pLimitInfo
))
{
int32_t
status
=
handleLimitOffset
(
pOperator
,
pLimitInfo
,
pExchangeInfo
->
pResult
,
false
);
if
(
status
==
PROJECT_RETRIEVE_CONTINUE
)
{
continue
;
}
else
if
(
status
==
PROJECT_RETRIEVE_DONE
)
{
size_t
rows
=
pExchangeInfo
->
pResult
->
info
.
rows
;
pExchangeInfo
->
limitInfo
.
numOfOutputRows
+=
rows
;
if
(
rows
==
0
)
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
}
else
{
return
pExchangeInfo
->
pResult
;
}
}
}
else
{
return
pExchangeInfo
->
pResult
;
}
}
}
static
int32_t
initDataSource
(
int32_t
numOfSources
,
SExchangeInfo
*
pInfo
,
const
char
*
id
)
{
pInfo
->
pSourceDataInfo
=
taosArrayInit
(
numOfSources
,
sizeof
(
SSourceDataInfo
));
if
(
pInfo
->
pSourceDataInfo
==
NULL
)
{
...
...
@@ -2408,6 +2451,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
taosArrayPush
(
pInfo
->
pSources
,
pNode
);
}
initLimitInfo
(
pExNode
->
node
.
pLimit
,
pExNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
pInfo
->
self
=
taosAddRef
(
exchangeObjRefPool
,
pInfo
);
return
initDataSource
(
numOfSources
,
pInfo
,
id
);
...
...
@@ -3151,68 +3195,60 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
return
TDB_CODE_SUCCESS
;
}
enum
{
PROJECT_RETRIEVE_CONTINUE
=
0x1
,
PROJECT_RETRIEVE_DONE
=
0x2
,
};
static
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
SProjectOperatorInfo
*
pProjectInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pInfo
=
&
pProjectInfo
->
binfo
;
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
if
(
pProjectInfo
->
curSOffset
>
0
)
{
if
(
pProjectInfo
->
groupId
==
0
)
{
// it is the first group
pProjectInfo
->
groupId
=
pBlock
->
info
.
groupId
;
blockDataCleanup
(
pInfo
->
pRes
);
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
)
{
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
currentGroupId
==
0
)
{
// it is the first group
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pProjectInfo
->
groupId
!=
pBlock
->
info
.
groupId
)
{
pProjectInfo
->
curSOffset
-=
1
;
}
else
if
(
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
// now it is the data from a new group
pLimitInfo
->
remainGroupOffset
-=
1
;
// ignore data block in current group
if
(
p
ProjectInfo
->
curS
Offset
>
0
)
{
blockDataCleanup
(
p
Info
->
pRes
);
if
(
p
LimitInfo
->
remainGroup
Offset
>
0
)
{
blockDataCleanup
(
p
Block
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
// set current group id of the project operator
p
ProjectInfo
->
g
roupId
=
pBlock
->
info
.
groupId
;
p
LimitInfo
->
currentG
roupId
=
pBlock
->
info
.
groupId
;
}
if
(
p
ProjectInfo
->
groupId
!=
0
&&
pProjectInfo
->
g
roupId
!=
pBlock
->
info
.
groupId
)
{
p
ProjectInfo
->
curGroupOutput
+=
1
;
if
((
p
ProjectInfo
->
slimit
.
limit
>
0
)
&&
(
pProjectInfo
->
slimit
.
limit
<=
pProjectInfo
->
curGroupOutput
))
{
if
(
p
LimitInfo
->
currentGroupId
!=
0
&&
pLimitInfo
->
currentG
roupId
!=
pBlock
->
info
.
groupId
)
{
p
LimitInfo
->
numOfOutputGroups
+=
1
;
if
((
p
LimitInfo
->
slimit
.
limit
>
0
)
&&
(
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
blockDataCleanup
(
p
Res
);
blockDataCleanup
(
p
Block
);
return
PROJECT_RETRIEVE_DONE
;
}
// reset the value for a new group data
p
ProjectInfo
->
curOffset
=
0
;
p
ProjectInfo
->
curOutput
=
0
;
p
LimitInfo
->
numOfOutputRows
=
0
;
p
LimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
p
ProjectInfo
->
g
roupId
=
pBlock
->
info
.
groupId
;
p
LimitInfo
->
currentG
roupId
=
pBlock
->
info
.
groupId
;
if
(
p
ProjectInfo
->
curOffset
>=
pRes
->
info
.
rows
)
{
p
ProjectInfo
->
curOffset
-=
pRes
->
info
.
rows
;
blockDataCleanup
(
p
Res
);
if
(
p
LimitInfo
->
remainOffset
>=
pBlock
->
info
.
rows
)
{
p
LimitInfo
->
remainOffset
-=
pBlock
->
info
.
rows
;
blockDataCleanup
(
p
Block
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
p
ProjectInfo
->
curOffset
<
pRes
->
info
.
rows
&&
pProjectInfo
->
cur
Offset
>
0
)
{
blockDataTrimFirstNRows
(
p
Res
,
pProjectInfo
->
cur
Offset
);
p
ProjectInfo
->
cur
Offset
=
0
;
}
else
if
(
p
LimitInfo
->
remainOffset
<
pBlock
->
info
.
rows
&&
pLimitInfo
->
remain
Offset
>
0
)
{
blockDataTrimFirstNRows
(
p
Block
,
pLimitInfo
->
remain
Offset
);
p
LimitInfo
->
remain
Offset
=
0
;
}
// check for the limitation in each group
if
(
p
ProjectInfo
->
limit
.
limit
>=
0
&&
pProjectInfo
->
curOutput
+
pRes
->
info
.
rows
>=
pProjec
tInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
p
ProjectInfo
->
limit
.
limit
-
pProjectInfo
->
curOutput
);
blockDataKeepFirstNRows
(
p
Res
,
keepRows
);
if
(
p
ProjectInfo
->
slimit
.
limit
>
0
&&
pProjectInfo
->
slimit
.
limit
<=
pProjectInfo
->
curGroupOutput
)
{
if
(
p
LimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimi
tInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
p
LimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
p
Block
,
keepRows
);
if
(
p
LimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
...
...
@@ -3222,8 +3258,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
||
pProjec
tInfo
->
slimit
.
offset
!=
-
1
||
p
Projec
tInfo
->
slimit
.
limit
!=
-
1
)
{
if
(
(
!
holdDataInBuf
)
||
(
pBlock
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
||
pLimi
tInfo
->
slimit
.
offset
!=
-
1
||
p
Limi
tInfo
->
slimit
.
limit
!=
-
1
)
{
return
PROJECT_RETRIEVE_DONE
;
}
else
{
// not full enough, continue to accumulate the output data in the buffer.
return
PROJECT_RETRIEVE_CONTINUE
;
...
...
@@ -3309,7 +3345,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
longjmp
(
pTaskInfo
->
env
,
code
);
}
int32_t
status
=
handleLimitOffset
(
pOperator
,
pBlock
);
int32_t
status
=
handleLimitOffset
(
pOperator
,
&
pProjectInfo
->
limitInfo
,
pInfo
->
pRes
,
true
);
// filter shall be applied after apply functions and limit/offset on the result
doFilter
(
pProjectInfo
->
pFilterNode
,
pInfo
->
pRes
);
...
...
@@ -3321,9 +3357,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
}
pProjectInfo
->
curOutput
+=
pInfo
->
pRes
->
info
.
rows
;
size_t
rows
=
pInfo
->
pRes
->
info
.
rows
;
pProjectInfo
->
limitInfo
.
numOfOutputRows
+=
rows
;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
if
(
pOperator
->
cost
.
openCost
==
0
)
{
...
...
@@ -3767,10 +3803,6 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols)
return
pList
;
}
static
int64_t
getLimit
(
SNode
*
pLimit
)
{
return
NULL
==
pLimit
?
-
1
:
((
SLimitNode
*
)
pLimit
)
->
limit
;
}
static
int64_t
getOffset
(
SNode
*
pLimit
)
{
return
NULL
==
pLimit
?
-
1
:
((
SLimitNode
*
)
pLimit
)
->
offset
;
}
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SProjectPhysiNode
*
pProjPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SProjectOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SProjectOperatorInfo
));
...
...
@@ -3783,13 +3815,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
SExprInfo
*
pExprInfo
=
createExprInfo
(
pProjPhyNode
->
pProjections
,
NULL
,
&
numOfCols
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pProjPhyNode
->
node
.
pOutputDataBlockDesc
);
SLimit
limit
=
{.
limit
=
getLimit
(
pProjPhyNode
->
node
.
pLimit
),
.
offset
=
getOffset
(
pProjPhyNode
->
node
.
pLimit
)};
SLimit
slimit
=
{.
limit
=
getLimit
(
pProjPhyNode
->
node
.
pSlimit
),
.
offset
=
getOffset
(
pProjPhyNode
->
node
.
pSlimit
)};
initLimitInfo
(
pProjPhyNode
->
node
.
pLimit
,
pProjPhyNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
pInfo
->
limit
=
limit
;
pInfo
->
slimit
=
slimit
;
pInfo
->
curOffset
=
limit
.
offset
;
pInfo
->
curSOffset
=
slimit
.
offset
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pFilterNode
=
pProjPhyNode
->
node
.
pConditions
;
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
1ed388c0
...
...
@@ -440,19 +440,19 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
recordNewGroupKeys
(
pInfo
->
pGroupCols
,
pInfo
->
pGroupColVals
,
pBlock
,
j
);
int32_t
len
=
buildGroupKeys
(
pInfo
->
keyBuf
,
pInfo
->
pGroupColVals
);
SDataGroupInfo
*
pGInfo
=
NULL
;
void
*
pPage
=
getCurrentDataGroupInfo
(
pInfo
,
&
pGInfo
,
len
);
SDataGroupInfo
*
pG
roup
Info
=
NULL
;
void
*
pPage
=
getCurrentDataGroupInfo
(
pInfo
,
&
pG
roup
Info
,
len
);
pGInfo
->
numOfRows
+=
1
;
if
(
pGInfo
->
groupId
==
0
)
{
pGInfo
->
groupId
=
calcGroupId
(
pInfo
->
keyBuf
,
len
);
pGroupInfo
->
numOfRows
+=
1
;
// group id
if
(
pGroupInfo
->
groupId
==
0
)
{
pGroupInfo
->
groupId
=
calcGroupId
(
pInfo
->
keyBuf
,
len
);
}
// number of rows
int32_t
*
rows
=
(
int32_t
*
)
pPage
;
// group id
size_t
numOfCols
=
pOperator
->
exprSupp
.
numOfExprs
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SExprInfo
*
pExpr
=
&
pOperator
->
exprSupp
.
pExprInfo
[
i
];
...
...
@@ -603,7 +603,13 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
static
int
compareDataGroupInfo
(
const
void
*
group1
,
const
void
*
group2
)
{
const
SDataGroupInfo
*
pGroupInfo1
=
group1
;
const
SDataGroupInfo
*
pGroupInfo2
=
group2
;
return
pGroupInfo1
->
groupId
-
pGroupInfo2
->
groupId
;
if
(
pGroupInfo1
->
groupId
==
pGroupInfo2
->
groupId
)
{
ASSERT
(
0
);
return
0
;
}
return
(
pGroupInfo1
->
groupId
<
pGroupInfo2
->
groupId
)
?
-
1
:
1
;
}
static
SSDataBlock
*
buildPartitionResult
(
SOperatorInfo
*
pOperator
)
{
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
1ed388c0
...
...
@@ -22,31 +22,31 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSortPhysiNode
*
pSortPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
// todo add limit/offset impl
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSortPhysiNode
*
pSortNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SSortOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
/* || rowSize > 100 * 1024 * 1024*/
)
{
goto
_error
;
}
SDataBlockDescNode
*
pDescNode
=
pSort
Phy
Node
->
node
.
pOutputDataBlockDesc
;
SDataBlockDescNode
*
pDescNode
=
pSortNode
->
node
.
pOutputDataBlockDesc
;
int32_t
numOfCols
=
0
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pDescNode
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSort
Phy
Node
->
pExprs
,
NULL
,
&
numOfCols
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSortNode
->
pExprs
,
NULL
,
&
numOfCols
);
int32_t
numOfOutputCols
=
0
;
SArray
*
pColMatchColInfo
=
extractColMatchInfo
(
pSort
Phy
Node
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
extractColMatchInfo
(
pSortNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
pOperator
->
exprSupp
.
pCtx
=
createSqlFunctionCtx
(
pExprInfo
,
numOfCols
,
&
pOperator
->
exprSupp
.
rowEntryInfoOffset
);
pInfo
->
binfo
.
pRes
=
pResBlock
;
initResultSizeInfo
(
pOperator
,
1024
);
pInfo
->
pSortInfo
=
createSortInfo
(
pSort
Phy
Node
->
pSortKeys
);
pInfo
->
pCondition
=
pSort
Phy
Node
->
node
.
pConditions
;
pInfo
->
pSortInfo
=
createSortInfo
(
pSortNode
->
pSortKeys
);
pInfo
->
pCondition
=
pSortNode
->
node
.
pConditions
;
pInfo
->
pColMatchInfo
=
pColMatchColInfo
;
pOperator
->
name
=
"SortOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SORT
;
...
...
source/libs/executor/src/tsort.c
浏览文件 @
1ed388c0
...
...
@@ -91,7 +91,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
tsortSetComparFp
(
pSortHandle
,
msortComparFn
);
if
(
idstr
!=
NULL
)
{
pSortHandle
->
idStr
=
strdup
(
idstr
);
pSortHandle
->
idStr
=
strdup
(
idstr
);
}
return
pSortHandle
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录