Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b1c74d8f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b1c74d8f
编写于
8月 13, 2022
作者:
M
Minglei Jin
提交者:
GitHub
8月 13, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16092 from taosdata/fix/TD-18372
fix: destroy pResList array in error cases
上级
9c979572
a3fb4064
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
55 addition
and
54 deletion
+55
-54
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+55
-54
未找到文件。
source/libs/qworker/src/qworker.c
浏览文件 @
b1c74d8f
#include "qworker.h"
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
...
...
@@ -7,7 +9,6 @@
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"
#include "qworker.h"
SQWorkerMgmt
gQwMgmt
=
{
.
lock
=
0
,
...
...
@@ -15,7 +16,6 @@ SQWorkerMgmt gQwMgmt = {
.
qwNum
=
0
,
};
int32_t
qwProcessHbLinkBroken
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
...
...
@@ -26,7 +26,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
QW_LOCK
(
QW_WRITE
,
&
sch
->
hbConnLock
);
sch
->
hbBrokenTs
=
taosGetTimestampMs
();
if
(
qwMsg
->
connInfo
.
handle
==
sch
->
hbConnInfo
.
handle
)
{
tmsgReleaseHandle
(
&
sch
->
hbConnInfo
,
TAOS_CONN_SERVER
);
sch
->
hbConnInfo
.
handle
=
NULL
;
...
...
@@ -44,8 +44,8 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
QW_RET
(
TSDB_CODE_SUCCESS
);
}
static
void
freeItem
(
void
*
param
)
{
SExplainExecInfo
*
pInfo
=
param
;
static
void
freeItem
(
void
*
param
)
{
SExplainExecInfo
*
pInfo
=
param
;
taosMemoryFree
(
pInfo
->
verboseInfo
);
}
...
...
@@ -54,7 +54,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
if
(
TASK_TYPE_TEMP
==
ctx
->
taskType
&&
taskHandle
)
{
if
(
ctx
->
explain
)
{
SArray
*
execInfoList
=
taosArrayInit
(
4
,
sizeof
(
SExplainExecInfo
));
SArray
*
execInfoList
=
taosArrayInit
(
4
,
sizeof
(
SExplainExecInfo
));
QW_ERR_RET
(
qGetExplainExecInfo
(
taskHandle
,
execInfoList
));
SRpcHandleInfo
connInfo
=
ctx
->
ctrlConnInfo
;
...
...
@@ -81,7 +81,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
qTaskInfo_t
taskHandle
=
ctx
->
taskHandle
;
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
SArray
*
pResList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
SArray
*
pResList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
while
(
true
)
{
QW_TASK_DLOG
(
"start to execTask, loopIdx:%d"
,
i
++
);
...
...
@@ -95,7 +95,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
}
else
{
QW_TASK_DLOG
(
"qExecTask failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
}
QW_ERR_RET
(
code
);
QW_ERR_
J
RET
(
code
);
}
}
...
...
@@ -105,7 +105,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
QW_TASK_DLOG
(
"qExecTask end with empty res, useconds:%"
PRIu64
,
useconds
);
dsEndPut
(
sinkHandle
,
useconds
);
QW_ERR_RET
(
qwHandleTaskComplete
(
QW_FPARAMS
(),
ctx
));
QW_ERR_
J
RET
(
qwHandleTaskComplete
(
QW_FPARAMS
(),
ctx
));
if
(
queryStop
)
{
*
queryStop
=
true
;
...
...
@@ -114,7 +114,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
break
;
}
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pResList
);
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pResList
);
++
j
)
{
SSDataBlock
*
pRes
=
taosArrayGetP
(
pResList
,
j
);
ASSERT
(
pRes
->
info
.
rows
>
0
);
...
...
@@ -122,7 +122,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
code
=
dsPutDataBlock
(
sinkHandle
,
&
inputData
,
&
qcontinue
);
if
(
code
)
{
QW_TASK_ELOG
(
"dsPutDataBlock failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
QW_ERR_RET
(
code
);
QW_ERR_
J
RET
(
code
);
}
QW_TASK_DLOG
(
"data put into sink, rows:%d, continueExecTask:%d"
,
pRes
->
info
.
rows
,
qcontinue
);
...
...
@@ -132,7 +132,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
if
(
queryStop
)
{
*
queryStop
=
true
;
}
break
;
}
...
...
@@ -151,6 +151,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
taosArrayDestroy
(
pResList
);
QW_RET
(
code
);
_return:
taosArrayDestroy
(
pResList
);
return
code
;
}
int32_t
qwGenerateSchHbRsp
(
SQWorker
*
mgmt
,
SQWSchStatus
*
sch
,
SQWHbInfo
*
hbInfo
)
{
...
...
@@ -222,7 +227,8 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_ERR_RET
(
code
);
}
QW_TASK_DLOG
(
"no more data in sink and query end, fetched blocks %d rows %d"
,
pOutput
->
numOfBlocks
,
pOutput
->
numOfRows
);
QW_TASK_DLOG
(
"no more data in sink and query end, fetched blocks %d rows %d"
,
pOutput
->
numOfBlocks
,
pOutput
->
numOfRows
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCC
);
if
(
NULL
==
rsp
)
{
...
...
@@ -266,7 +272,8 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
pOutput
->
numOfBlocks
++
;
if
(
DS_BUF_EMPTY
==
pOutput
->
bufStatus
&&
pOutput
->
queryEnd
)
{
QW_TASK_DLOG
(
"task all data fetched and done, fetched blocks %d rows %d"
,
pOutput
->
numOfBlocks
,
pOutput
->
numOfRows
);
QW_TASK_DLOG
(
"task all data fetched and done, fetched blocks %d rows %d"
,
pOutput
->
numOfBlocks
,
pOutput
->
numOfRows
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCC
);
break
;
}
...
...
@@ -288,10 +295,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
}
int32_t
qwGetDeleteResFromSink
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
SDeleteRes
*
pRes
)
{
int64_t
len
=
0
;
bool
queryEnd
=
false
;
int32_t
code
=
0
;
SOutputData
output
=
{
0
};
int64_t
len
=
0
;
bool
queryEnd
=
false
;
int32_t
code
=
0
;
SOutputData
output
=
{
0
};
dsGetDataLength
(
ctx
->
sinkHandle
,
&
len
,
&
queryEnd
);
...
...
@@ -304,7 +311,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
if
(
NULL
==
output
.
pData
)
{
QW_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
code
=
dsGetDataBlock
(
ctx
->
sinkHandle
,
&
output
);
if
(
code
)
{
QW_TASK_ELOG
(
"dsGetDataBlock failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
...
...
@@ -312,8 +319,8 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
QW_ERR_RET
(
code
);
}
SDeleterRes
*
pDelRes
=
(
SDeleterRes
*
)
output
.
pData
;
SDeleterRes
*
pDelRes
=
(
SDeleterRes
*
)
output
.
pData
;
pRes
->
suid
=
pDelRes
->
suid
;
pRes
->
uidList
=
pDelRes
->
uidList
;
pRes
->
skey
=
pDelRes
->
skey
;
...
...
@@ -322,14 +329,13 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
strcpy
(
pRes
->
tableFName
,
pDelRes
->
tableName
);
strcpy
(
pRes
->
tsColName
,
pDelRes
->
tsColName
);
taosMemoryFree
(
output
.
pData
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwHandlePrePhaseEvents
(
QW_FPARAMS_DEF
,
int8_t
phase
,
SQWPhaseInput
*
input
,
SQWPhaseOutput
*
output
)
{
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
QW_TASK_DLOG
(
"start to handle event at phase %s"
,
qwPhaseStr
(
phase
));
...
...
@@ -355,8 +361,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if
(
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
//QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
//
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
//
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_DROPPED
);
break
;
...
...
@@ -391,8 +397,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if
(
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
//QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
//
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
//
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_DROPPED
);
}
...
...
@@ -428,9 +434,9 @@ _return:
}
int32_t
qwHandlePostPhaseEvents
(
QW_FPARAMS_DEF
,
int8_t
phase
,
SQWPhaseInput
*
input
,
SQWPhaseOutput
*
output
)
{
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
SRpcHandleInfo
connInfo
=
{
0
};
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
SRpcHandleInfo
connInfo
=
{
0
};
QW_TASK_DLOG
(
"start to handle event at phase %s"
,
qwPhaseStr
(
phase
));
...
...
@@ -449,8 +455,8 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
//QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
//
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
//
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_DROPPED
);
...
...
@@ -473,14 +479,14 @@ _return:
if
(
QW_PHASE_POST_QUERY
==
phase
&&
ctx
)
{
ctx
->
queryRsped
=
true
;
bool
rsped
=
false
;
bool
rsped
=
false
;
SQWMsg
qwMsg
=
{.
msgType
=
ctx
->
msgType
,
.
connInfo
=
ctx
->
ctrlConnInfo
};
qwDbgSimulateRedirect
(
&
qwMsg
,
ctx
,
&
rsped
);
qwDbgSimulateDead
(
QW_FPARAMS
(),
ctx
,
&
rsped
);
if
(
!
rsped
)
{
qwBuildAndSendQueryRsp
(
input
->
msgType
+
1
,
&
ctx
->
ctrlConnInfo
,
code
,
ctx
);
QW_TASK_DLOG
(
"query msg rsped, handle:%p, code:%x - %s"
,
ctx
->
ctrlConnInfo
.
handle
,
code
,
tstrerror
(
code
));
}
}
}
if
(
ctx
)
{
...
...
@@ -507,7 +513,6 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwPreprocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
int32_t
code
=
0
;
bool
queryRsped
=
false
;
...
...
@@ -537,8 +542,7 @@ _return:
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
char
*
sql
)
{
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
char
*
sql
)
{
int32_t
code
=
0
;
bool
queryRsped
=
false
;
SSubplan
*
plan
=
NULL
;
...
...
@@ -556,7 +560,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql) {
ctx
->
needFetch
=
qwMsg
->
msgInfo
.
needFetch
;
ctx
->
msgType
=
qwMsg
->
msgType
;
//QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
//
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code
=
qStringToSubplan
(
qwMsg
->
msg
,
&
plan
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
...
...
@@ -594,7 +598,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql) {
_return:
taosMemoryFree
(
sql
);
input
.
code
=
code
;
input
.
msgType
=
qwMsg
->
msgType
;
code
=
qwHandlePostPhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_POST_QUERY
,
&
input
,
NULL
);
...
...
@@ -648,7 +652,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwBuildAndSendFetchRsp
(
ctx
->
fetchType
,
&
qwMsg
->
connInfo
,
rsp
,
dataLen
,
code
);
rsp
=
NULL
;
QW_TASK_DLOG
(
"fetch rsp send, handle:%p, code:%x - %s, dataLen:%d"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
),
dataLen
);
}
else
{
...
...
@@ -754,13 +758,13 @@ _return:
if
(
code
||
rsp
)
{
bool
rsped
=
false
;
if
(
ctx
)
{
qwDbgSimulateRedirect
(
qwMsg
,
ctx
,
&
rsped
);
qwDbgSimulateRedirect
(
qwMsg
,
ctx
,
&
rsped
);
qwDbgSimulateDead
(
QW_FPARAMS
(),
ctx
,
&
rsped
);
}
if
(
!
rsped
)
{
qwBuildAndSendFetchRsp
(
qwMsg
->
msgType
+
1
,
&
qwMsg
->
connInfo
,
rsp
,
dataLen
,
code
);
QW_TASK_DLOG
(
"%s send, handle:%p, code:%x - %s, dataLen:%d"
,
TMSG_INFO
(
qwMsg
->
msgType
+
1
),
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
),
dataLen
);
QW_TASK_DLOG
(
"%s send, handle:%p, code:%x - %s, dataLen:%d"
,
TMSG_INFO
(
qwMsg
->
msgType
+
1
),
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
),
dataLen
);
}
}
...
...
@@ -919,10 +923,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
uint64_t
*
sId
=
taosHashGetKey
(
pIter
,
NULL
);
QW_TLOG
(
"cancel send hb to sch %"
PRIx64
" cause of no connection handle"
,
*
sId
);
if
(
sch
->
hbBrokenTs
>
0
&&
((
currentMs
-
sch
->
hbBrokenTs
)
>
QW_SCH_TIMEOUT_MSEC
)
&&
taosHashGetSize
(
sch
->
tasksHash
)
<=
0
)
{
if
(
sch
->
hbBrokenTs
>
0
&&
((
currentMs
-
sch
->
hbBrokenTs
)
>
QW_SCH_TIMEOUT_MSEC
)
&&
taosHashGetSize
(
sch
->
tasksHash
)
<=
0
)
{
taosArrayPush
(
pExpiredSch
,
sId
);
}
pIter
=
taosHashIterate
(
mgmt
->
schHash
,
pIter
);
continue
;
}
...
...
@@ -998,7 +1003,6 @@ _return:
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qWorkerInit
(
int8_t
nodeType
,
int32_t
nodeId
,
SQWorkerCfg
*
cfg
,
void
**
qWorkerMgmt
,
const
SMsgCb
*
pMsgCb
)
{
if
(
NULL
==
qWorkerMgmt
||
pMsgCb
->
mgmt
==
NULL
)
{
qError
(
"invalid param to init qworker"
);
...
...
@@ -1119,12 +1123,12 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
QW_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SDataSinkStat
sinkStat
=
{
0
};
dsDataSinkGetCacheSize
(
&
sinkStat
);
pStat
->
cacheDataSize
=
sinkStat
.
cachedSize
;
pStat
->
queryProcessed
=
QW_STAT_GET
(
mgmt
->
stat
.
msgStat
.
queryProcessed
);
pStat
->
cqueryProcessed
=
QW_STAT_GET
(
mgmt
->
stat
.
msgStat
.
cqueryProcessed
);
pStat
->
fetchProcessed
=
QW_STAT_GET
(
mgmt
->
stat
.
msgStat
.
fetchProcessed
);
...
...
@@ -1139,6 +1143,3 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
return
TSDB_CODE_SUCCESS
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录