Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ce1b294c
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ce1b294c
编写于
8月 10, 2023
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: memory leak issues
上级
81439ba0
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
388 addition
and
120 deletion
+388
-120
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+13
-0
source/libs/executor/inc/groupcache.h
source/libs/executor/inc/groupcache.h
+2
-3
source/libs/executor/inc/operator.h
source/libs/executor/inc/operator.h
+1
-6
source/libs/executor/src/dynqueryctrloperator.c
source/libs/executor/src/dynqueryctrloperator.c
+112
-79
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+10
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+0
-4
source/libs/executor/src/executorInt.c
source/libs/executor/src/executorInt.c
+137
-0
source/libs/executor/src/groupcacheoperator.c
source/libs/executor/src/groupcacheoperator.c
+85
-4
source/libs/executor/src/mergejoinoperator.c
source/libs/executor/src/mergejoinoperator.c
+10
-3
source/libs/executor/src/operator.c
source/libs/executor/src/operator.c
+15
-20
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-0
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+2
-0
未找到文件。
source/libs/executor/inc/executorInt.h
浏览文件 @
ce1b294c
...
...
@@ -48,6 +48,14 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
typedef
struct
STsdbReader
STsdbReader
;
typedef
struct
STqReader
STqReader
;
typedef
enum
SOperatorParamType
{
OP_GET_PARAM
=
1
,
OP_NOTIFY_PARAM
}
SOperatorParamType
;
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
...
...
@@ -741,6 +749,11 @@ void streamOpReloadState(struct SOperatorInfo* pOperator);
void
destroyOperatorParamValue
(
void
*
pValues
);
int32_t
mergeOperatorParams
(
SOperatorParam
*
pDst
,
SOperatorParam
*
pSrc
);
int32_t
buildTableScanOperatorParam
(
SOperatorParam
**
ppRes
,
SArray
*
pUidList
,
int32_t
srcOpType
,
bool
tableSeq
);
void
freeExchangeGetBasicOperatorParam
(
void
*
pParam
);
void
freeOperatorParam
(
SOperatorParam
*
pParam
,
SOperatorParamType
type
);
void
freeResetOperatorParams
(
struct
SOperatorInfo
*
pOperator
,
SOperatorParamType
type
,
bool
allFree
);
SSDataBlock
*
getNextBlockFromDownstreamImpl
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
,
bool
clearParam
);
#ifdef __cplusplus
}
...
...
source/libs/executor/inc/groupcache.h
浏览文件 @
ce1b294c
...
...
@@ -56,7 +56,7 @@ typedef struct SGcDownstreamCtx {
SRWLatch
grpLock
;
int64_t
fetchSessionId
;
SArray
*
pNewGrpList
;
// SArray<SGcNewGroupInfo>
SSHashObj
*
pVgTbHash
;
SSHashObj
*
pVgTbHash
;
// SHash<SGcVgroupCtx>
SHashObj
*
pGrpHash
;
SRWLatch
blkLock
;
SSDataBlock
*
pBaseBlock
;
...
...
@@ -136,7 +136,6 @@ typedef struct SGcBlkBufInfo {
}
SGcBlkBufInfo
;
typedef
struct
SGcExecInfo
{
int32_t
downstreamNum
;
int64_t
*
pDownstreamBlkNum
;
}
SGcExecInfo
;
...
...
@@ -157,13 +156,13 @@ typedef struct SGcBlkCacheInfo {
}
SGcBlkCacheInfo
;
typedef
struct
SGroupCacheOperatorInfo
{
TdThreadMutex
sessionMutex
;
int64_t
maxCacheSize
;
int64_t
currentBlkId
;
SGroupColsInfo
groupColsInfo
;
bool
globalGrp
;
bool
grpByUid
;
bool
batchFetch
;
int32_t
downstreamNum
;
SGcDownstreamCtx
*
pDownstreams
;
SGcBlkCacheInfo
blkCache
;
SHashObj
*
pGrpHash
;
...
...
source/libs/executor/inc/operator.h
浏览文件 @
ce1b294c
...
...
@@ -20,11 +20,6 @@
extern
"C"
{
#endif
typedef
enum
SOperatorParamType
{
OP_GET_PARAM
=
1
,
OP_NOTIFY_PARAM
}
SOperatorParamType
;
typedef
struct
SOperatorCostInfo
{
double
openCost
;
double
totalCost
;
...
...
@@ -180,7 +175,7 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
SSDataBlock
*
optrDefaultGetNextExtFn
(
struct
SOperatorInfo
*
pOperator
,
SOperatorParam
*
pParam
);
int32_t
optrDefaultNotifyFn
(
struct
SOperatorInfo
*
pOperator
,
SOperatorParam
*
pParam
);
SSDataBlock
*
getNextBlockFromDownstream
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
);
SSDataBlock
*
getNextBlockFromDownstream
Once
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
);
SSDataBlock
*
getNextBlockFromDownstream
Remain
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
);
int16_t
getOperatorResultBlockId
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
);
SOperatorInfo
*
createOperator
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
SNode
*
pTagCond
,
...
...
source/libs/executor/src/dynqueryctrloperator.c
浏览文件 @
ce1b294c
...
...
@@ -33,38 +33,122 @@ void freeVgTableList(void* ptr) {
taosArrayDestroy
(
*
(
SArray
**
)
ptr
);
}
static
void
destroyStbJoinTableList
(
SStbJoinTableList
*
pListHead
)
{
SStbJoinTableList
*
pNext
=
NULL
;
while
(
pListHead
)
{
taosMemoryFree
(
pListHead
->
pLeftVg
);
taosMemoryFree
(
pListHead
->
pLeftUid
);
taosMemoryFree
(
pListHead
->
pRightVg
);
taosMemoryFree
(
pListHead
->
pRightUid
);
pNext
=
pListHead
->
pNext
;
taosMemoryFree
(
pListHead
);
pListHead
=
pNext
;
}
}
static
void
destroyDynQueryCtrlOperator
(
void
*
param
)
{
SDynQueryCtrlOperatorInfo
*
pDyn
=
(
SDynQueryCtrlOperatorInfo
*
)
param
;
static
void
destroyStbJoinDynCtrlInfo
(
SStbJoinDynCtrlInfo
*
pStbJoin
)
{
qError
(
"dynQueryCtrl exec info, prevBlk:%"
PRId64
", prevRows:%"
PRId64
", postBlk:%"
PRId64
", postRows:%"
PRId64
", leftCacheNum:%"
PRId64
", rightCacheNum:%"
PRId64
,
p
Dyn
->
stbJoin
.
execInfo
.
prevBlkNum
,
pDyn
->
stbJoin
.
execInfo
.
prevBlkRows
,
pDyn
->
stbJoin
.
execInfo
.
postBlkNum
,
p
Dyn
->
stbJoin
.
execInfo
.
postBlkRows
,
pDyn
->
stbJoin
.
execInfo
.
leftCacheNum
,
pDyn
->
stbJoin
.
execInfo
.
rightCacheNum
);
p
StbJoin
->
execInfo
.
prevBlkNum
,
pStbJoin
->
execInfo
.
prevBlkRows
,
pStbJoin
->
execInfo
.
postBlkNum
,
p
StbJoin
->
execInfo
.
postBlkRows
,
pStbJoin
->
execInfo
.
leftCacheNum
,
pStbJoin
->
execInfo
.
rightCacheNum
);
if
(
p
Dyn
->
stbJoin
.
basic
.
batchFetch
)
{
if
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
leftHash
)
{
tSimpleHashSetFreeFp
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
leftHash
,
freeVgTableList
);
tSimpleHashCleanup
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
leftHash
);
if
(
p
StbJoin
->
basic
.
batchFetch
)
{
if
(
p
StbJoin
->
ctx
.
prev
.
leftHash
)
{
tSimpleHashSetFreeFp
(
p
StbJoin
->
ctx
.
prev
.
leftHash
,
freeVgTableList
);
tSimpleHashCleanup
(
p
StbJoin
->
ctx
.
prev
.
leftHash
);
}
if
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
rightHash
)
{
tSimpleHashSetFreeFp
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
rightHash
,
freeVgTableList
);
tSimpleHashCleanup
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
rightHash
);
if
(
p
StbJoin
->
ctx
.
prev
.
rightHash
)
{
tSimpleHashSetFreeFp
(
p
StbJoin
->
ctx
.
prev
.
rightHash
,
freeVgTableList
);
tSimpleHashCleanup
(
p
StbJoin
->
ctx
.
prev
.
rightHash
);
}
}
else
{
if
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
leftCache
)
{
tSimpleHashCleanup
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
leftCache
);
if
(
p
StbJoin
->
ctx
.
prev
.
leftCache
)
{
tSimpleHashCleanup
(
p
StbJoin
->
ctx
.
prev
.
leftCache
);
}
if
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
rightCache
)
{
tSimpleHashCleanup
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
rightCache
);
if
(
p
StbJoin
->
ctx
.
prev
.
rightCache
)
{
tSimpleHashCleanup
(
p
StbJoin
->
ctx
.
prev
.
rightCache
);
}
if
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
onceTable
)
{
tSimpleHashCleanup
(
p
Dyn
->
stbJoin
.
ctx
.
prev
.
onceTable
);
if
(
p
StbJoin
->
ctx
.
prev
.
onceTable
)
{
tSimpleHashCleanup
(
p
StbJoin
->
ctx
.
prev
.
onceTable
);
}
}
destroyStbJoinTableList
(
pStbJoin
->
ctx
.
prev
.
pListHead
);
}
static
void
destroyDynQueryCtrlOperator
(
void
*
param
)
{
SDynQueryCtrlOperatorInfo
*
pDyn
=
(
SDynQueryCtrlOperatorInfo
*
)
param
;
switch
(
pDyn
->
qType
)
{
case
DYN_QTYPE_STB_HASH
:
destroyStbJoinDynCtrlInfo
(
&
pDyn
->
stbJoin
);
break
;
default:
qError
(
"unsupported dynamic query ctrl type: %d"
,
pDyn
->
qType
);
break
;
}
taosMemoryFreeClear
(
param
);
}
static
FORCE_INLINE
int32_t
buildGroupCacheOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
int32_t
vgId
,
int64_t
tbUid
,
bool
needCache
,
SOperatorParam
*
pChild
)
{
static
FORCE_INLINE
bool
tableNeedCache
(
int64_t
uid
,
SStbJoinPrevJoinCtx
*
pPrev
,
SStbJoinPostJoinCtx
*
pPost
,
bool
rightTable
,
bool
batchFetch
)
{
if
(
batchFetch
)
{
return
true
;
}
if
(
rightTable
)
{
return
pPost
->
rightCurrUid
==
pPost
->
rightNextUid
;
}
uint32_t
*
num
=
tSimpleHashGet
(
pPrev
->
leftCache
,
&
uid
,
sizeof
(
uid
));
return
(
NULL
==
num
)
?
false
:
true
;
}
static
void
updatePostJoinCurrTableInfo
(
SStbJoinDynCtrlInfo
*
pStbJoin
)
{
SStbJoinPrevJoinCtx
*
pPrev
=
&
pStbJoin
->
ctx
.
prev
;
SStbJoinPostJoinCtx
*
pPost
=
&
pStbJoin
->
ctx
.
post
;
SStbJoinTableList
*
pNode
=
pPrev
->
pListHead
;
int32_t
*
leftVgId
=
pNode
->
pLeftVg
+
pNode
->
readIdx
;
int32_t
*
rightVgId
=
pNode
->
pRightVg
+
pNode
->
readIdx
;
int64_t
*
leftUid
=
pNode
->
pLeftUid
+
pNode
->
readIdx
;
int64_t
*
rightUid
=
pNode
->
pRightUid
+
pNode
->
readIdx
;
int64_t
readIdx
=
pNode
->
readIdx
+
1
;
int64_t
rightPrevUid
=
pPost
->
rightCurrUid
;
pPost
->
leftCurrUid
=
*
leftUid
;
pPost
->
rightCurrUid
=
*
rightUid
;
pPost
->
leftVgId
=
*
leftVgId
;
pPost
->
rightVgId
=
*
rightVgId
;
while
(
true
)
{
if
(
readIdx
<
pNode
->
uidNum
)
{
pPost
->
rightNextUid
=
*
(
pNode
->
pRightUid
+
readIdx
);
break
;
}
pNode
=
pNode
->
pNext
;
if
(
NULL
==
pNode
)
{
pPost
->
rightNextUid
=
0
;
break
;
}
rightUid
=
pNode
->
pRightUid
;
readIdx
=
0
;
}
pPost
->
leftNeedCache
=
tableNeedCache
(
*
leftUid
,
pPrev
,
pPost
,
false
,
pStbJoin
->
basic
.
batchFetch
);
pPost
->
rightNeedCache
=
tableNeedCache
(
*
rightUid
,
pPrev
,
pPost
,
true
,
pStbJoin
->
basic
.
batchFetch
);
if
(
pPost
->
rightNeedCache
&&
rightPrevUid
!=
pPost
->
rightCurrUid
)
{
tSimpleHashPut
(
pPrev
->
rightCache
,
&
pPost
->
rightCurrUid
,
sizeof
(
pPost
->
rightCurrUid
),
NULL
,
0
);
pStbJoin
->
execInfo
.
rightCacheNum
++
;
}
}
static
int32_t
buildGroupCacheOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
int32_t
vgId
,
int64_t
tbUid
,
bool
needCache
,
SOperatorParam
*
pChild
)
{
*
ppRes
=
taosMemoryMalloc
(
sizeof
(
SOperatorParam
));
if
(
NULL
==
*
ppRes
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -100,7 +184,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
}
static
FORCE_INLINE
int32_t
buildGroupCacheNotifyOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
int32_t
vgId
,
int64_t
tbUid
)
{
static
int32_t
buildGroupCacheNotifyOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
int32_t
vgId
,
int64_t
tbUid
)
{
*
ppRes
=
taosMemoryMalloc
(
sizeof
(
SOperatorParam
));
if
(
NULL
==
*
ppRes
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -124,7 +208,7 @@ static FORCE_INLINE int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam**
}
static
FORCE_INLINE
int32_t
buildExchangeOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
int32_t
*
pVgId
,
int64_t
*
pUid
)
{
static
int32_t
buildExchangeOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
int32_t
*
pVgId
,
int64_t
*
pUid
)
{
*
ppRes
=
taosMemoryMalloc
(
sizeof
(
SOperatorParam
));
if
(
NULL
==
*
ppRes
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -155,7 +239,7 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i
}
static
FORCE_INLINE
int32_t
buildBatchExchangeOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
SSHashObj
*
pVg
)
{
static
int32_t
buildBatchExchangeOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
SSHashObj
*
pVg
)
{
*
ppRes
=
taosMemoryMalloc
(
sizeof
(
SOperatorParam
));
if
(
NULL
==
*
ppRes
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -173,6 +257,7 @@ static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppR
taosMemoryFree
(
pExc
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
tSimpleHashSetFreeFp
(
pExc
->
pBatchs
,
freeExchangeGetBasicOperatorParam
);
SExchangeOperatorBasicParam
basic
;
basic
.
srcOpType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
;
...
...
@@ -189,6 +274,7 @@ static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppR
tSimpleHashPut
(
pExc
->
pBatchs
,
pVgId
,
sizeof
(
*
pVgId
),
&
basic
,
sizeof
(
basic
));
qTrace
(
"build downstreamIdx %d batch scan, vgId:%d, uidNum:%"
PRId64
,
downstreamIdx
,
*
pVgId
,
(
int64_t
)
taosArrayGetSize
(
pUidList
));
*
(
SArray
**
)
p
=
NULL
;
}
(
*
ppRes
)
->
opType
=
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
;
...
...
@@ -199,7 +285,7 @@ static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppR
}
static
FORCE_INLINE
int32_t
buildMergeJoinOperatorParam
(
SOperatorParam
**
ppRes
,
bool
initParam
,
SOperatorParam
*
pChild0
,
SOperatorParam
*
pChild1
)
{
static
int32_t
buildMergeJoinOperatorParam
(
SOperatorParam
**
ppRes
,
bool
initParam
,
SOperatorParam
*
pChild0
,
SOperatorParam
*
pChild1
)
{
*
ppRes
=
taosMemoryMalloc
(
sizeof
(
SOperatorParam
));
if
(
NULL
==
*
ppRes
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -229,7 +315,7 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes,
}
static
FORCE_INLINE
int32_t
buildMergeJoinNotifyOperatorParam
(
SOperatorParam
**
ppRes
,
SOperatorParam
*
pChild0
,
SOperatorParam
*
pChild1
)
{
static
int32_t
buildMergeJoinNotifyOperatorParam
(
SOperatorParam
**
ppRes
,
SOperatorParam
*
pChild0
,
SOperatorParam
*
pChild1
)
{
*
ppRes
=
taosMemoryMalloc
(
sizeof
(
SOperatorParam
));
if
(
NULL
==
*
ppRes
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -251,63 +337,9 @@ static FORCE_INLINE int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** p
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
bool
tableNeedCache
(
int64_t
uid
,
SStbJoinPrevJoinCtx
*
pPrev
,
SStbJoinPostJoinCtx
*
pPost
,
bool
rightTable
,
bool
batchFetch
)
{
if
(
batchFetch
)
{
return
true
;
}
if
(
rightTable
)
{
return
pPost
->
rightCurrUid
==
pPost
->
rightNextUid
;
}
uint32_t
*
num
=
tSimpleHashGet
(
pPrev
->
leftCache
,
&
uid
,
sizeof
(
uid
));
return
(
NULL
==
num
)
?
false
:
true
;
}
static
void
updatePostJoinCurrTableInfo
(
SStbJoinDynCtrlInfo
*
pStbJoin
)
{
SStbJoinPrevJoinCtx
*
pPrev
=
&
pStbJoin
->
ctx
.
prev
;
SStbJoinPostJoinCtx
*
pPost
=
&
pStbJoin
->
ctx
.
post
;
SStbJoinTableList
*
pNode
=
pPrev
->
pListHead
;
int32_t
*
leftVgId
=
pNode
->
pLeftVg
+
pNode
->
readIdx
;
int32_t
*
rightVgId
=
pNode
->
pRightVg
+
pNode
->
readIdx
;
int64_t
*
leftUid
=
pNode
->
pLeftUid
+
pNode
->
readIdx
;
int64_t
*
rightUid
=
pNode
->
pRightUid
+
pNode
->
readIdx
;
int64_t
readIdx
=
pNode
->
readIdx
+
1
;
int64_t
rightPrevUid
=
pPost
->
rightCurrUid
;
pPost
->
leftCurrUid
=
*
leftUid
;
pPost
->
rightCurrUid
=
*
rightUid
;
pPost
->
leftVgId
=
*
leftVgId
;
pPost
->
rightVgId
=
*
rightVgId
;
while
(
true
)
{
if
(
readIdx
<
pNode
->
uidNum
)
{
pPost
->
rightNextUid
=
*
(
pNode
->
pRightUid
+
readIdx
);
break
;
}
pNode
=
pNode
->
pNext
;
if
(
NULL
==
pNode
)
{
pPost
->
rightNextUid
=
0
;
break
;
}
rightUid
=
pNode
->
pRightUid
;
readIdx
=
0
;
}
pPost
->
leftNeedCache
=
tableNeedCache
(
*
leftUid
,
pPrev
,
pPost
,
false
,
pStbJoin
->
basic
.
batchFetch
);
pPost
->
rightNeedCache
=
tableNeedCache
(
*
rightUid
,
pPrev
,
pPost
,
true
,
pStbJoin
->
basic
.
batchFetch
);
if
(
pPost
->
rightNeedCache
&&
rightPrevUid
!=
pPost
->
rightCurrUid
)
{
tSimpleHashPut
(
pPrev
->
rightCache
,
&
pPost
->
rightCurrUid
,
sizeof
(
pPost
->
rightCurrUid
),
NULL
,
0
);
pStbJoin
->
execInfo
.
rightCacheNum
++
;
}
}
static
FORCE_INLINE
int32_t
buildBatchTableScanOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
SSHashObj
*
pVg
)
{
static
int32_t
buildBatchTableScanOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
SSHashObj
*
pVg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
vgNum
=
tSimpleHashGetSize
(
pVg
);
if
(
vgNum
<=
0
||
vgNum
>
1
)
{
...
...
@@ -325,13 +357,14 @@ static FORCE_INLINE int32_t buildBatchTableScanOperatorParam(SOperatorParam** pp
if
(
code
)
{
return
code
;
}
*
(
SArray
**
)
p
=
NULL
;
}
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
int32_t
buildSingleTableScanOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
int32_t
*
pVgId
,
int64_t
*
pUid
)
{
static
int32_t
buildSingleTableScanOperatorParam
(
SOperatorParam
**
ppRes
,
int32_t
downstreamIdx
,
int32_t
*
pVgId
,
int64_t
*
pUid
)
{
SArray
*
pUidList
=
taosArrayInit
(
1
,
sizeof
(
int64_t
));
if
(
NULL
==
pUidList
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
ce1b294c
...
...
@@ -382,7 +382,8 @@ void doDestroyExchangeOperatorInfo(void* param) {
taosArrayDestroyEx
(
pExInfo
->
pRecycledBlocks
,
freeBlock
);
blockDataDestroy
(
pExInfo
->
pDummyBlock
);
tSimpleHashCleanup
(
pExInfo
->
pHashSources
);
tsem_destroy
(
&
pExInfo
->
ready
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -438,11 +439,14 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in
STableScanOperatorParam
*
pScan
=
taosMemoryMalloc
(
sizeof
(
STableScanOperatorParam
));
if
(
NULL
==
pScan
)
{
taosMemoryFreeClear
(
*
ppRes
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pScan
->
pUidList
=
taosArrayDup
(
pUidList
,
NULL
);
if
(
NULL
==
pScan
->
pUidList
)
{
taosMemoryFree
(
pScan
);
taosMemoryFreeClear
(
*
ppRes
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pScan
->
tableSeq
=
tableSeq
;
...
...
@@ -500,6 +504,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
if
(
msgSize
<
0
)
{
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pWrapper
);
freeOperatorParam
(
req
.
pOpParam
,
OP_GET_PARAM
);
return
pTaskInfo
->
code
;
}
...
...
@@ -507,6 +512,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
if
(
NULL
==
msg
)
{
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pWrapper
);
freeOperatorParam
(
req
.
pOpParam
,
OP_GET_PARAM
);
return
pTaskInfo
->
code
;
}
...
...
@@ -514,9 +520,12 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pWrapper
);
taosMemoryFree
(
msg
);
freeOperatorParam
(
req
.
pOpParam
,
OP_GET_PARAM
);
return
pTaskInfo
->
code
;
}
freeOperatorParam
(
req
.
pOpParam
,
OP_GET_PARAM
);
qDebug
(
"%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
", execId:%d, %p, %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
addr
.
epSet
.
eps
[
0
].
fqdn
,
pSource
->
taskId
,
pSource
->
execId
,
pExchangeInfo
,
sourceIndex
,
totalSources
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
ce1b294c
...
...
@@ -502,10 +502,6 @@ bool qIsDynamicExecTask(qTaskInfo_t tinfo) {
return
((
SExecTaskInfo
*
)
tinfo
)
->
dynamicTask
;
}
void
destroyOperatorParamValue
(
void
*
pValues
)
{
}
void
destroyOperatorParam
(
SOperatorParam
*
pParam
)
{
if
(
NULL
==
pParam
)
{
return
;
...
...
source/libs/executor/src/executorInt.c
浏览文件 @
ce1b294c
...
...
@@ -1070,3 +1070,140 @@ void streamOpReloadState(SOperatorInfo* pOperator) {
downstream
->
fpSet
.
reloadStreamStateFn
(
downstream
);
}
}
void
freeOperatorParamImpl
(
SOperatorParam
*
pParam
,
SOperatorParamType
type
)
{
int32_t
childrenNum
=
taosArrayGetSize
(
pParam
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
childrenNum
;
++
i
)
{
SOperatorParam
*
pChild
=
taosArrayGetP
(
pParam
->
pChildren
,
i
);
freeOperatorParam
(
pChild
,
type
);
}
taosArrayDestroy
(
pParam
->
pChildren
);
taosMemoryFree
(
pParam
->
value
);
taosMemoryFree
(
pParam
);
}
void
freeExchangeGetBasicOperatorParam
(
void
*
pParam
)
{
SExchangeOperatorBasicParam
*
pBasic
=
(
SExchangeOperatorBasicParam
*
)
pParam
;
taosArrayDestroy
(
pBasic
->
uidList
);
}
void
freeExchangeGetOperatorParam
(
SOperatorParam
*
pParam
)
{
SExchangeOperatorParam
*
pExcParam
=
(
SExchangeOperatorParam
*
)
pParam
->
value
;
if
(
pExcParam
->
multiParams
)
{
SExchangeOperatorBatchParam
*
pExcBatch
=
(
SExchangeOperatorBatchParam
*
)
pParam
->
value
;
tSimpleHashCleanup
(
pExcBatch
->
pBatchs
);
}
else
{
freeExchangeGetBasicOperatorParam
(
&
pExcParam
->
basic
);
}
freeOperatorParamImpl
(
pParam
,
OP_GET_PARAM
);
}
void
freeExchangeNotifyOperatorParam
(
SOperatorParam
*
pParam
)
{
freeOperatorParamImpl
(
pParam
,
OP_NOTIFY_PARAM
);
}
void
freeGroupCacheGetOperatorParam
(
SOperatorParam
*
pParam
)
{
freeOperatorParamImpl
(
pParam
,
OP_GET_PARAM
);
}
void
freeGroupCacheNotifyOperatorParam
(
SOperatorParam
*
pParam
)
{
freeOperatorParamImpl
(
pParam
,
OP_NOTIFY_PARAM
);
}
void
freeMergeJoinGetOperatorParam
(
SOperatorParam
*
pParam
)
{
freeOperatorParamImpl
(
pParam
,
OP_GET_PARAM
);
}
void
freeMergeJoinNotifyOperatorParam
(
SOperatorParam
*
pParam
)
{
freeOperatorParamImpl
(
pParam
,
OP_NOTIFY_PARAM
);
}
void
freeTableScanGetOperatorParam
(
SOperatorParam
*
pParam
)
{
STableScanOperatorParam
*
pTableScanParam
=
(
STableScanOperatorParam
*
)
pParam
->
value
;
taosArrayDestroy
(
pTableScanParam
->
pUidList
);
freeOperatorParamImpl
(
pParam
,
OP_GET_PARAM
);
}
void
freeTableScanNotifyOperatorParam
(
SOperatorParam
*
pParam
)
{
freeOperatorParamImpl
(
pParam
,
OP_NOTIFY_PARAM
);
}
void
freeOperatorParam
(
SOperatorParam
*
pParam
,
SOperatorParamType
type
)
{
if
(
NULL
==
pParam
)
{
return
;
}
switch
(
pParam
->
opType
)
{
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
type
==
OP_GET_PARAM
?
freeExchangeGetOperatorParam
(
pParam
)
:
freeExchangeNotifyOperatorParam
(
pParam
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
:
type
==
OP_GET_PARAM
?
freeGroupCacheGetOperatorParam
(
pParam
)
:
freeGroupCacheNotifyOperatorParam
(
pParam
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
type
==
OP_GET_PARAM
?
freeMergeJoinGetOperatorParam
(
pParam
)
:
freeMergeJoinNotifyOperatorParam
(
pParam
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
:
type
==
OP_GET_PARAM
?
freeTableScanGetOperatorParam
(
pParam
)
:
freeTableScanNotifyOperatorParam
(
pParam
);
break
;
default:
qError
(
"unsupported op %d param, type %d"
,
pParam
->
opType
,
type
);
break
;
}
}
void
freeResetOperatorParams
(
struct
SOperatorInfo
*
pOperator
,
SOperatorParamType
type
,
bool
allFree
)
{
SOperatorParam
**
ppParam
=
NULL
;
SOperatorParam
***
pppDownstramParam
=
NULL
;
switch
(
type
)
{
case
OP_GET_PARAM
:
ppParam
=
&
pOperator
->
pOperatorGetParam
;
pppDownstramParam
=
&
pOperator
->
pDownstreamGetParams
;
break
;
case
OP_NOTIFY_PARAM
:
ppParam
=
&
pOperator
->
pOperatorNotifyParam
;
pppDownstramParam
=
&
pOperator
->
pDownstreamNotifyParams
;
break
;
default:
return
;
}
if
(
*
ppParam
)
{
freeOperatorParam
(
*
ppParam
,
type
);
*
ppParam
=
NULL
;
}
if
(
*
pppDownstramParam
)
{
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfDownstream
;
++
i
)
{
if
((
*
pppDownstramParam
)[
i
])
{
freeOperatorParam
((
*
pppDownstramParam
)[
i
],
type
);
(
*
pppDownstramParam
)[
i
]
=
NULL
;
}
}
if
(
allFree
)
{
taosMemoryFreeClear
(
*
pppDownstramParam
);
}
}
}
FORCE_INLINE
SSDataBlock
*
getNextBlockFromDownstreamImpl
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
,
bool
clearParam
)
{
if
(
pOperator
->
pDownstreamGetParams
&&
pOperator
->
pDownstreamGetParams
[
idx
])
{
qDebug
(
"DynOp: op %s start to get block from downstream %s"
,
pOperator
->
name
,
pOperator
->
pDownstream
[
idx
]
->
name
);
SSDataBlock
*
pBlock
=
pOperator
->
pDownstream
[
idx
]
->
fpSet
.
getNextExtFn
(
pOperator
->
pDownstream
[
idx
],
pOperator
->
pDownstreamGetParams
[
idx
]);
if
(
clearParam
)
{
freeOperatorParam
(
pOperator
->
pDownstreamGetParams
[
idx
],
OP_GET_PARAM
);
pOperator
->
pDownstreamGetParams
[
idx
]
=
NULL
;
}
return
pBlock
;
}
return
pOperator
->
pDownstream
[
idx
]
->
fpSet
.
getNextFn
(
pOperator
->
pDownstream
[
idx
]);
}
source/libs/executor/src/groupcacheoperator.c
浏览文件 @
ce1b294c
...
...
@@ -73,15 +73,85 @@ static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, S
}
static
void
logGroupCacheExecInfo
(
SGroupCacheOperatorInfo
*
pGrpCacheOperator
)
{
char
*
buf
=
taosMemoryMalloc
(
pGrpCacheOperator
->
execInfo
.
downstreamNum
*
32
+
100
);
char
*
buf
=
taosMemoryMalloc
(
pGrpCacheOperator
->
downstreamNum
*
32
+
100
);
if
(
NULL
==
buf
)
{
return
;
}
int32_t
offset
=
sprintf
(
buf
,
"groupCache exec info, downstreamBlkNum:"
);
for
(
int32_t
i
=
0
;
i
<
pGrpCacheOperator
->
execInfo
.
downstreamNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pGrpCacheOperator
->
downstreamNum
;
++
i
)
{
offset
+=
sprintf
(
buf
+
offset
,
" %"
PRId64
,
pGrpCacheOperator
->
execInfo
.
pDownstreamBlkNum
[
i
]);
}
qDebug
(
"%s"
,
buf
);
taosMemoryFree
(
buf
);
}
static
void
freeSGcSessionCtx
(
void
*
p
)
{
SGcSessionCtx
*
pSession
=
p
;
if
(
pSession
->
semInit
)
{
tsem_destroy
(
&
pSession
->
waitSem
);
}
}
static
void
freeSGroupCacheFileInfo
(
void
*
p
)
{
SGroupCacheFileInfo
*
pFileInfo
=
p
;
if
(
pFileInfo
->
deleted
)
{
return
;
}
removeGroupCacheFile
(
pFileInfo
);
}
static
void
freeSGcFileCacheCtx
(
SGcFileCacheCtx
*
pFileCtx
)
{
taosHashCleanup
(
pFileCtx
->
pCacheFile
);
}
static
void
freeSGcVgroupCtx
(
void
*
p
)
{
SGcVgroupCtx
*
pVgCtx
=
p
;
taosArrayDestroy
(
pVgCtx
->
pTbList
);
freeSGcFileCacheCtx
(
&
pVgCtx
->
fileCtx
);
}
static
void
freeGcBlockInList
(
void
*
p
)
{
SSDataBlock
**
ppBlock
=
p
;
if
(
*
ppBlock
)
{
taosArrayDestroy
((
*
ppBlock
)
->
pDataBlock
);
taosMemoryFree
(
*
ppBlock
);
}
}
static
void
freeSGcDownstreamCtx
(
SGcDownstreamCtx
*
pCtx
)
{
taosArrayDestroy
(
pCtx
->
pNewGrpList
);
tSimpleHashCleanup
(
pCtx
->
pVgTbHash
);
taosHashCleanup
(
pCtx
->
pGrpHash
);
taosArrayDestroyEx
(
pCtx
->
pFreeBlock
,
freeGcBlockInList
);
taosHashCleanup
(
pCtx
->
pSessions
);
taosHashCleanup
(
pCtx
->
pWaitSessions
);
freeSGcFileCacheCtx
(
&
pCtx
->
fileCtx
);
}
static
void
destroyGroupCacheDownstreamCtx
(
SGroupCacheOperatorInfo
*
pGrpCacheOperator
)
{
if
(
NULL
==
pGrpCacheOperator
->
pDownstreams
)
{
return
;
}
for
(
int32_t
i
=
0
;
i
<
pGrpCacheOperator
->
downstreamNum
;
++
i
)
{
SGcDownstreamCtx
*
pCtx
=
&
pGrpCacheOperator
->
pDownstreams
[
i
];
freeSGcDownstreamCtx
(
pCtx
);
}
taosMemoryFree
(
pGrpCacheOperator
->
pDownstreams
);
}
static
void
destroySGcBlkCacheInfo
(
SGcBlkCacheInfo
*
pBlkCache
)
{
taosHashCleanup
(
pBlkCache
->
pDirtyBlk
);
void
*
p
=
NULL
;
while
(
p
=
taosHashIterate
(
pBlkCache
->
pReadBlk
,
p
))
{
freeGcBlockInList
(
p
);
}
taosHashCleanup
(
pBlkCache
->
pReadBlk
);
}
static
void
destroyGroupCacheOperator
(
void
*
param
)
{
...
...
@@ -91,7 +161,12 @@ static void destroyGroupCacheOperator(void* param) {
taosMemoryFree
(
pGrpCacheOperator
->
groupColsInfo
.
pColsInfo
);
taosMemoryFree
(
pGrpCacheOperator
->
groupColsInfo
.
pBuf
);
destroyGroupCacheDownstreamCtx
(
pGrpCacheOperator
);
destroySGcBlkCacheInfo
(
&
pGrpCacheOperator
->
blkCache
);
taosHashCleanup
(
pGrpCacheOperator
->
pGrpHash
);
taosMemoryFree
(
pGrpCacheOperator
->
execInfo
.
pDownstreamBlkNum
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -117,6 +192,7 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S
if
(
NULL
==
pFileCtx
->
pCacheFile
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
taosHashSetFreeFp
(
pFileCtx
->
pCacheFile
,
freeSGroupCacheFileInfo
);
}
SGroupCacheFileInfo
*
pTmp
=
taosHashGet
(
pFileCtx
->
pCacheFile
,
&
fileId
,
sizeof
(
fileId
));
...
...
@@ -622,6 +698,7 @@ static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int
if
(
NULL
==
pFileCtx
->
pCacheFile
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
taosHashSetFreeFp
(
pFileCtx
->
pCacheFile
,
freeSGroupCacheFileInfo
);
}
SGroupCacheFileInfo
*
pTmp
=
taosHashGet
(
pFileCtx
->
pCacheFile
,
&
fileId
,
sizeof
(
fileId
));
...
...
@@ -1091,7 +1168,6 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock
static
int32_t
initGroupCacheExecInfo
(
SOperatorInfo
*
pOperator
)
{
SGroupCacheOperatorInfo
*
pInfo
=
pOperator
->
info
;
pInfo
->
execInfo
.
downstreamNum
=
pOperator
->
numOfDownstream
;
pInfo
->
execInfo
.
pDownstreamBlkNum
=
taosMemoryCalloc
(
pOperator
->
numOfDownstream
,
sizeof
(
int64_t
));
if
(
NULL
==
pInfo
->
execInfo
.
pDownstreamBlkNum
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -1127,6 +1203,7 @@ static void freeRemoveGroupCacheData(void* p) {
taosArrayDestroy
(
pGroup
->
waitQueue
);
taosArrayDestroy
(
pGroup
->
blkList
.
pList
);
taosThreadMutexDestroy
(
&
pGroup
->
mutex
);
qTrace
(
"group removed"
);
}
...
...
@@ -1139,6 +1216,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
if
(
NULL
==
pInfo
->
pDownstreams
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pInfo
->
downstreamNum
=
pOperator
->
numOfDownstream
;
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfDownstream
;
++
i
)
{
SGcDownstreamCtx
*
pCtx
=
&
pInfo
->
pDownstreams
[
i
];
...
...
@@ -1149,6 +1227,8 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
if
(
NULL
==
pCtx
->
pVgTbHash
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
tSimpleHashSetFreeFp
(
pCtx
->
pVgTbHash
,
freeSGcVgroupCtx
);
if
(
pInfo
->
batchFetch
)
{
int32_t
defaultVg
=
0
;
SGcVgroupCtx
vgCtx
=
{
0
};
...
...
@@ -1172,6 +1252,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
if
(
pCtx
->
pSessions
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
taosHashSetFreeFp
(
pCtx
->
pSessions
,
freeSGcSessionCtx
);
pCtx
->
pFreeBlock
=
taosArrayInit
(
10
,
POINTER_BYTES
);
if
(
NULL
==
pCtx
->
pFreeBlock
)
{
...
...
@@ -1234,7 +1315,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
setOperatorInfo
(
pOperator
,
"GroupCacheOperator"
,
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pInfo
->
maxCacheSize
=
1
;
pInfo
->
maxCacheSize
=
0
;
pInfo
->
grpByUid
=
pPhyciNode
->
grpByUid
;
pInfo
->
globalGrp
=
pPhyciNode
->
globalGrp
;
pInfo
->
batchFetch
=
pPhyciNode
->
batchFetch
;
...
...
source/libs/executor/src/mergejoinoperator.c
浏览文件 @
ce1b294c
...
...
@@ -301,10 +301,15 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo
->
rightBuildTable
=
tSimpleHashInit
(
256
,
hashFn
);
}
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doMergeJoin
,
NULL
,
destroyMergeJoinOperator
,
optrDefaultBufFn
,
NULL
,
optrDefaultGetNextExtFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
pDownstream
,
numOfDownstream
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
if
(
newDownstreams
)
{
taosMemoryFree
(
pDownstream
);
}
pOperator
->
numOfRealDownstream
=
newDownstreams
?
1
:
2
;
return
pOperator
;
...
...
@@ -449,7 +454,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
mergeJoinGetBlockRowsEqualTs
(
dataBlock
,
tsSlotId
,
startPos
,
timestamp
,
&
endPos
,
rowLocations
,
createdBlocks
);
while
(
endPos
==
dataBlock
->
info
.
rows
)
{
SOperatorInfo
*
ds
=
pOperator
->
pDownstream
[
whichChild
];
dataBlock
=
getNextBlockFromDownstream
(
pOperator
,
whichChild
);
dataBlock
=
getNextBlockFromDownstream
Remain
(
pOperator
,
whichChild
);
qError
(
"merge join %s got block for same ts, rows:%"
PRId64
,
whichChild
==
0
?
"left"
:
"right"
,
dataBlock
?
dataBlock
->
info
.
rows
:
0
);
if
(
whichChild
==
0
)
{
pJoinInfo
->
leftPos
=
0
;
...
...
@@ -648,6 +653,8 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
static
void
setMergeJoinDone
(
SOperatorInfo
*
pOperator
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
freeOperatorParam
(
pOperator
->
pDownstreamGetParams
[
0
],
OP_GET_PARAM
);
freeOperatorParam
(
pOperator
->
pDownstreamGetParams
[
1
],
OP_GET_PARAM
);
pOperator
->
pDownstreamGetParams
[
0
]
=
NULL
;
pOperator
->
pDownstreamGetParams
[
1
]
=
NULL
;
}
...
...
@@ -658,7 +665,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
if
(
pJoinInfo
->
pLeft
==
NULL
||
pJoinInfo
->
leftPos
>=
pJoinInfo
->
pLeft
->
info
.
rows
)
{
if
(
!
pJoinInfo
->
downstreamFetchDone
[
0
])
{
pJoinInfo
->
pLeft
=
getNextBlockFromDownstream
(
pOperator
,
0
);
pJoinInfo
->
pLeft
=
getNextBlockFromDownstream
Remain
(
pOperator
,
0
);
pJoinInfo
->
leftPos
=
0
;
qError
(
"merge join left got block, rows:%"
PRId64
,
pJoinInfo
->
pLeft
?
pJoinInfo
->
pLeft
->
info
.
rows
:
0
);
...
...
@@ -679,7 +686,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
if
(
pJoinInfo
->
pRight
==
NULL
||
pJoinInfo
->
rightPos
>=
pJoinInfo
->
pRight
->
info
.
rows
)
{
if
(
!
pJoinInfo
->
downstreamFetchDone
[
1
])
{
pJoinInfo
->
pRight
=
getNextBlockFromDownstream
(
pOperator
,
1
);
pJoinInfo
->
pRight
=
getNextBlockFromDownstream
Remain
(
pOperator
,
1
);
if
(
pOperator
->
pOperatorGetParam
&&
((
SSortMergeJoinOperatorParam
*
)
pOperator
->
pOperatorGetParam
->
value
)
->
initDownstreamNum
>
0
)
{
((
SSortMergeJoinOperatorParam
*
)
pOperator
->
pOperatorGetParam
->
value
)
->
initDownstreamNum
--
;
...
...
source/libs/executor/src/operator.c
浏览文件 @
ce1b294c
...
...
@@ -551,11 +551,15 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
return
pOptr
;
}
void
destroyOperator
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
==
NULL
)
{
return
;
}
freeResetOperatorParams
(
pOperator
,
OP_GET_PARAM
,
true
);
freeResetOperatorParams
(
pOperator
,
OP_NOTIFY_PARAM
,
true
);
if
(
pOperator
->
fpSet
.
closeFn
!=
NULL
)
{
pOperator
->
fpSet
.
closeFn
(
pOperator
->
info
);
}
...
...
@@ -626,9 +630,12 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
taosMemoryFree
(
pBatch
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
tSimpleHashSetFreeFp
(
pBatch
->
pBatchs
,
freeExchangeGetBasicOperatorParam
);
tSimpleHashPut
(
pBatch
->
pBatchs
,
&
pDExc
->
basic
.
vgId
,
sizeof
(
pDExc
->
basic
.
vgId
),
&
pDExc
->
basic
,
sizeof
(
pDExc
->
basic
));
tSimpleHashPut
(
pBatch
->
pBatchs
,
&
pSExc
->
basic
.
vgId
,
sizeof
(
pSExc
->
basic
.
vgId
),
&
pSExc
->
basic
,
sizeof
(
pSExc
->
basic
));
destroyOperatorParamValue
(
pDst
->
value
);
taosMemoryFree
(
pDst
->
value
);
pDst
->
value
=
pBatch
;
}
else
{
taosArrayAddAll
(
pDExc
->
basic
.
uidList
,
pSExc
->
basic
.
uidList
);
...
...
@@ -668,10 +675,10 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
default:
return
TSDB_CODE_INVALID_PARA
;
}
freeResetOperatorParams
(
pOperator
,
type
,
false
);
if
(
NULL
==
pInput
)
{
*
ppParam
=
NULL
;
taosMemoryFreeClear
(
*
pppDownstramParam
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -710,31 +717,19 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
}
}
taosArrayClear
((
*
ppParam
)
->
pChildren
);
taosArrayDestroy
((
*
ppParam
)
->
pChildren
);
(
*
ppParam
)
->
pChildren
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
SSDataBlock
*
getNextBlockFromDownstreamImpl
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
,
bool
clearParam
)
{
if
(
pOperator
->
pDownstreamGetParams
&&
pOperator
->
pDownstreamGetParams
[
idx
])
{
qDebug
(
"DynOp: op %s start to get block from downstream %s"
,
pOperator
->
name
,
pOperator
->
pDownstream
[
idx
]
->
name
);
SSDataBlock
*
pBlock
=
pOperator
->
pDownstream
[
idx
]
->
fpSet
.
getNextExtFn
(
pOperator
->
pDownstream
[
idx
],
pOperator
->
pDownstreamGetParams
[
idx
]);
if
(
clearParam
)
{
pOperator
->
pDownstreamGetParams
[
idx
]
=
NULL
;
}
return
pBlock
;
}
return
pOperator
->
pDownstream
[
idx
]
->
fpSet
.
getNextFn
(
pOperator
->
pDownstream
[
idx
]);
}
SSDataBlock
*
getNextBlockFromDownstream
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
)
{
return
getNextBlockFromDownstreamImpl
(
pOperator
,
idx
,
fals
e
);
return
getNextBlockFromDownstreamImpl
(
pOperator
,
idx
,
tru
e
);
}
SSDataBlock
*
getNextBlockFromDownstream
Once
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
)
{
return
getNextBlockFromDownstreamImpl
(
pOperator
,
idx
,
tru
e
);
SSDataBlock
*
getNextBlockFromDownstream
Remain
(
struct
SOperatorInfo
*
pOperator
,
int32_t
idx
)
{
return
getNextBlockFromDownstreamImpl
(
pOperator
,
idx
,
fals
e
);
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
ce1b294c
...
...
@@ -924,6 +924,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if
(
pOperator
->
pOperatorGetParam
)
{
pOperator
->
dynamicTask
=
true
;
int32_t
code
=
createTableListInfoFromParam
(
pOperator
);
freeOperatorParam
(
pOperator
->
pOperatorGetParam
,
OP_GET_PARAM
);
pOperator
->
pOperatorGetParam
=
NULL
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
ce1b294c
...
...
@@ -992,9 +992,11 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
pGrpCache
->
batchFetch
=
pLogicNode
->
batchFetch
;
SDataBlockDescNode
*
pChildDesc
=
((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
pOutputDataBlockDesc
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
/*
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pChildDesc->dataBlockId, -1, pLogicNode->pGroupCols, &pGrpCache->pGroupCols);
}
*/
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pGrpCache
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录