Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ed573475
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ed573475
编写于
7月 30, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(query): fix memory leak.
上级
1e391ad2
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
41 addition
and
131 deletion
+41
-131
include/libs/executor/executor.h
include/libs/executor/executor.h
+1
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+0
-56
source/common/src/tname.c
source/common/src/tname.c
+0
-28
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-4
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+2
-4
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+12
-25
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+9
-4
source/libs/qworker/inc/qwMsg.h
source/libs/qworker/inc/qwMsg.h
+1
-1
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+4
-4
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+10
-4
未找到文件。
include/libs/executor/executor.h
浏览文件 @
ed573475
...
...
@@ -155,7 +155,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
void
qProcessRspMsg
(
void
*
parent
,
struct
SRpcMsg
*
pMsg
,
struct
SEpSet
*
pEpSet
);
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
int32_t
*
resNum
,
SExplainExecInfo
**
pRes
);
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
SArray
*
pExecInfoList
/*,int32_t* resNum, SExplainExecInfo** pRes*/
);
int32_t
qSerializeTaskStatus
(
qTaskInfo_t
tinfo
,
char
**
pOutput
,
int32_t
*
len
);
...
...
source/common/src/tdatablock.c
浏览文件 @
ed573475
...
...
@@ -16,65 +16,9 @@
#define _DEFAULT_SOURCE
#include "tdatablock.h"
#include "tcompare.h"
#include "tglobal.h"
#include "tlog.h"
#include "tname.h"
int32_t
taosGetFqdnPortFromEp
(
const
char
*
ep
,
SEp
*
pEp
)
{
pEp
->
port
=
0
;
strcpy
(
pEp
->
fqdn
,
ep
);
char
*
temp
=
strchr
(
pEp
->
fqdn
,
':'
);
if
(
temp
)
{
*
temp
=
0
;
pEp
->
port
=
atoi
(
temp
+
1
);
}
if
(
pEp
->
port
==
0
)
{
pEp
->
port
=
tsServerPort
;
}
return
0
;
}
void
addEpIntoEpSet
(
SEpSet
*
pEpSet
,
const
char
*
fqdn
,
uint16_t
port
)
{
if
(
pEpSet
==
NULL
||
fqdn
==
NULL
||
strlen
(
fqdn
)
==
0
)
{
return
;
}
int32_t
index
=
pEpSet
->
numOfEps
;
tstrncpy
(
pEpSet
->
eps
[
index
].
fqdn
,
fqdn
,
tListLen
(
pEpSet
->
eps
[
index
].
fqdn
));
pEpSet
->
eps
[
index
].
port
=
port
;
pEpSet
->
numOfEps
+=
1
;
}
bool
isEpsetEqual
(
const
SEpSet
*
s1
,
const
SEpSet
*
s2
)
{
if
(
s1
->
numOfEps
!=
s2
->
numOfEps
||
s1
->
inUse
!=
s2
->
inUse
)
{
return
false
;
}
for
(
int32_t
i
=
0
;
i
<
s1
->
numOfEps
;
i
++
)
{
if
(
s1
->
eps
[
i
].
port
!=
s2
->
eps
[
i
].
port
||
strncmp
(
s1
->
eps
[
i
].
fqdn
,
s2
->
eps
[
i
].
fqdn
,
TSDB_FQDN_LEN
)
!=
0
)
return
false
;
}
return
true
;
}
void
updateEpSet_s
(
SCorEpSet
*
pEpSet
,
SEpSet
*
pNewEpSet
)
{
taosCorBeginWrite
(
&
pEpSet
->
version
);
pEpSet
->
epSet
=
*
pNewEpSet
;
taosCorEndWrite
(
&
pEpSet
->
version
);
}
SEpSet
getEpSet_s
(
SCorEpSet
*
pEpSet
)
{
SEpSet
ep
=
{
0
};
taosCorBeginRead
(
&
pEpSet
->
version
);
ep
=
pEpSet
->
epSet
;
taosCorEndRead
(
&
pEpSet
->
version
);
return
ep
;
}
int32_t
colDataGetLength
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
)
{
ASSERT
(
pColumnInfoData
!=
NULL
);
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
...
...
source/common/src/tname.c
浏览文件 @
ed573475
...
...
@@ -20,34 +20,6 @@
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
bool
tscValidateTableNameLength
(
size_t
len
)
{
return
len
<
TSDB_TABLE_NAME_LEN
;
}
#if 0
// TODO refactor
SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0 || src == NULL) {
assert(src == NULL);
return NULL;
}
SColumnFilterInfo* pFilter = taosMemoryCalloc(1, numOfFilters * sizeof(SColumnFilterInfo));
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) taosMemoryCalloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t) pFilter[j].len);
}
}
assert(src->filterstr == 0 || src->filterstr == 1);
assert(!(src->lowerRelOptr == 0 && src->upperRelOptr == 0));
return pFilter;
}
#endif
#if 0
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
if (slidingTime == 0) {
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
ed573475
...
...
@@ -855,7 +855,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin
int32_t
extractDataBlockFromFetchRsp
(
SSDataBlock
*
pRes
,
SLoadRemoteDataInfo
*
pLoadInfo
,
int32_t
numOfRows
,
char
*
pData
,
int32_t
compLen
,
int32_t
numOfOutput
,
int64_t
startTs
,
uint64_t
*
total
,
SArray
*
pColList
);
STimeWindow
getAlignQueryTimeWindow
(
SInterval
*
pInterval
,
int32_t
precision
,
int64_t
key
);
STimeWindow
getFirstQualifiedTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
,
SInterval
*
pInterval
,
int32_t
order
);
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
);
...
...
@@ -986,9 +985,8 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
const
char
*
sql
,
EOPTR_EXEC_MODEL
model
);
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
qTaskInfo_t
*
pTaskInfo
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SExplainExecInfo
**
pRes
,
int32_t
*
capacity
,
int32_t
*
resNum
);
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
qTaskInfo_t
*
pTaskInfo
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
int32_t
aggDecodeResultRow
(
SOperatorInfo
*
pOperator
,
char
*
result
);
int32_t
aggEncodeResultRow
(
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
ed573475
...
...
@@ -496,11 +496,9 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) {
doDestroyTask
(
pTaskInfo
);
}
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
int32_t
*
resNum
,
SExplainExecInfo
**
pRes
)
{
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
SArray
*
pExecInfoList
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
capacity
=
0
;
return
getOperatorExplainExecInfo
(
pTaskInfo
->
pRoot
,
pRes
,
&
capacity
,
resNum
);
return
getOperatorExplainExecInfo
(
pTaskInfo
->
pRoot
,
pExecInfoList
);
}
int32_t
qSerializeTaskStatus
(
qTaskInfo_t
tinfo
,
char
**
pOutput
,
int32_t
*
len
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
ed573475
...
...
@@ -4613,42 +4613,29 @@ void releaseQueryBuf(size_t numOfTables) {
atomic_add_fetch_64
(
&
tsQueryBufferSizeBytes
,
t
);
}
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SExplainExecInfo
**
pRes
,
int32_t
*
capacity
,
int32_t
*
resNum
)
{
if
(
*
resNum
>=
*
capacity
)
{
*
capacity
+=
10
;
*
pRes
=
taosMemoryRealloc
(
*
pRes
,
(
*
capacity
)
*
sizeof
(
SExplainExecInfo
));
if
(
NULL
==
*
pRes
)
{
qError
(
"malloc %d failed"
,
(
*
capacity
)
*
(
int32_t
)
sizeof
(
SExplainExecInfo
));
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
}
SExplainExecInfo
*
pInfo
=
&
(
*
pRes
)[
*
resNum
];
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
)
{
SExplainExecInfo
execInfo
=
{
0
};
SExplainExecInfo
*
pExplainInfo
=
taosArrayPush
(
pExecInfoList
,
&
execInfo
);
pInfo
->
numOfRows
=
operatorInfo
->
resultInfo
.
totalRows
;
pInfo
->
startupCost
=
operatorInfo
->
cost
.
openCost
;
pInfo
->
totalCost
=
operatorInfo
->
cost
.
totalCost
;
pExplainInfo
->
numOfRows
=
operatorInfo
->
resultInfo
.
totalRows
;
pExplainInfo
->
startupCost
=
operatorInfo
->
cost
.
openCost
;
pExplainInfo
->
totalCost
=
operatorInfo
->
cost
.
totalCost
;
pExplainInfo
->
verboseLen
=
0
;
pExplainInfo
->
verboseInfo
=
NULL
;
if
(
operatorInfo
->
fpSet
.
getExplainFn
)
{
int32_t
code
=
operatorInfo
->
fpSet
.
getExplainFn
(
operatorInfo
,
&
p
Info
->
verboseInfo
,
&
p
Info
->
verboseLen
);
int32_t
code
=
operatorInfo
->
fpSet
.
getExplainFn
(
operatorInfo
,
&
p
ExplainInfo
->
verboseInfo
,
&
pExplain
Info
->
verboseLen
);
if
(
code
)
{
qError
(
"%s operator getExplainFn failed, code:%s"
,
GET_TASKID
(
operatorInfo
->
pTaskInfo
),
tstrerror
(
code
));
return
code
;
}
}
else
{
pInfo
->
verboseLen
=
0
;
pInfo
->
verboseInfo
=
NULL
;
}
++
(
*
resNum
);
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
operatorInfo
->
numOfDownstream
;
++
i
)
{
code
=
getOperatorExplainExecInfo
(
operatorInfo
->
pDownstream
[
i
],
p
Res
,
capacity
,
resNum
);
if
(
code
)
{
taosMemoryFreeClear
(
*
pRes
);
code
=
getOperatorExplainExecInfo
(
operatorInfo
->
pDownstream
[
i
],
p
ExecInfoList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
//
taosMemoryFreeClear(*pRes);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
ed573475
...
...
@@ -31,14 +31,21 @@ static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity
static
int32_t
setGroupResultOutputBuf
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
bytes
,
uint64_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SAggSupporter
*
pAggSup
);
static
void
freeGroupKey
(
void
*
param
)
{
SGroupKeys
*
pKey
=
(
SGroupKeys
*
)
param
;
taosMemoryFree
(
pKey
->
pData
);
}
static
void
destroyGroupOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SGroupbyOperatorInfo
*
pInfo
=
(
SGroupbyOperatorInfo
*
)
param
;
cleanupBasicInfo
(
&
pInfo
->
binfo
);
taosMemoryFreeClear
(
pInfo
->
keyBuf
);
taosArrayDestroy
(
pInfo
->
pGroupCols
);
taosArrayDestroy
(
pInfo
->
pGroupColVals
);
taosArrayDestroy
Ex
(
pInfo
->
pGroupColVals
,
freeGroupKey
);
cleanupExprSupp
(
&
pInfo
->
scalarSup
);
cleanupGroupResInfo
(
&
pInfo
->
groupResInfo
);
cleanupAggSup
(
&
pInfo
->
aggSup
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -414,8 +421,6 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
// pOperator->operatorType = OP_Groupby;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
...
...
source/libs/qworker/inc/qwMsg.h
浏览文件 @
ed573475
...
...
@@ -39,7 +39,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
,
bool
qComplete
);
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
);
int32_t
qwBuildAndSendQueryRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
,
SQWTaskCtx
*
ctx
);
int32_t
qwBuildAndSendExplainRsp
(
SRpcHandleInfo
*
pConn
,
S
ExplainExecInfo
*
execInfo
,
int32_t
num
);
int32_t
qwBuildAndSendExplainRsp
(
SRpcHandleInfo
*
pConn
,
S
Array
*
pExecList
);
int32_t
qwBuildAndSendErrorRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
);
void
qwFreeFetchRsp
(
void
*
msg
);
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
...
...
source/libs/qworker/src/qwMsg.c
浏览文件 @
ed573475
...
...
@@ -82,8 +82,9 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendExplainRsp
(
SRpcHandleInfo
*
pConn
,
SExplainExecInfo
*
execInfo
,
int32_t
num
)
{
SExplainRsp
rsp
=
{.
numOfPlans
=
num
,
.
subplanInfo
=
execInfo
};
int32_t
qwBuildAndSendExplainRsp
(
SRpcHandleInfo
*
pConn
,
SArray
*
pExecList
)
{
SExplainExecInfo
*
pInfo
=
taosArrayGet
(
pExecList
,
0
);
SExplainRsp
rsp
=
{.
numOfPlans
=
taosArrayGetSize
(
pExecList
),
.
subplanInfo
=
pInfo
};
int32_t
contLen
=
tSerializeSExplainRsp
(
NULL
,
0
,
&
rsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
...
...
@@ -96,10 +97,9 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
.
code
=
0
,
.
info
=
*
pConn
,
};
rpcRsp
.
info
.
ahandle
=
NULL
;
rpcRsp
.
info
.
ahandle
=
NULL
;
tmsgSendRsp
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
ed573475
...
...
@@ -44,18 +44,24 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
QW_RET
(
TSDB_CODE_SUCCESS
);
}
static
void
freeItem
(
void
*
param
)
{
SExplainExecInfo
*
pInfo
=
param
;
taosMemoryFree
(
pInfo
->
verboseInfo
);
}
int32_t
qwHandleTaskComplete
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
)
{
qTaskInfo_t
taskHandle
=
ctx
->
taskHandle
;
if
(
TASK_TYPE_TEMP
==
ctx
->
taskType
&&
taskHandle
)
{
if
(
ctx
->
explain
)
{
SExplainExecInfo
*
execInfo
=
NULL
;
int32_t
resNum
=
0
;
QW_ERR_RET
(
qGetExplainExecInfo
(
taskHandle
,
&
resNum
,
&
execInfo
));
SArray
*
execInfoList
=
taosArrayInit
(
4
,
sizeof
(
SExplainExecInfo
));
QW_ERR_RET
(
qGetExplainExecInfo
(
taskHandle
,
execInfoList
));
SRpcHandleInfo
connInfo
=
ctx
->
ctrlConnInfo
;
connInfo
.
ahandle
=
NULL
;
QW_ERR_RET
(
qwBuildAndSendExplainRsp
(
&
connInfo
,
execInfo
,
resNum
));
int32_t
code
=
qwBuildAndSendExplainRsp
(
&
connInfo
,
execInfoList
);
taosArrayDestroyEx
(
execInfoList
,
freeItem
);
QW_ERR_RET
(
code
);
}
if
(
!
ctx
->
needFetch
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录