diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 95415e1113f0fe807157ac5a0e6509caef97def6..ea2180124a074b1512351ccc5a63038266088a59 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -480,6 +480,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { if (ret != TSDB_CODE_SUCCESS) { pTaskInfo->code = ret; cleanUpUdfs(); + qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); atomic_store_64(&pTaskInfo->owner, 0); @@ -512,8 +513,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { } cleanUpUdfs(); - uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; + uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 61c38f59db2d08e960100e1b615bdf8f0a104127..fbda8ee37fbb081697d9ce04b11b91151166b277 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -9,6 +9,7 @@ #include "tcommon.h" #include "tmsg.h" #include "tname.h" +#include "tdatablock.h" SQWorkerMgmt gQwMgmt = { .lock = 0, @@ -16,6 +17,11 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; +static void freeBlock(void* param) { + SSDataBlock* pBlock = *(SSDataBlock**)param; + blockDataDestroy(pBlock); +} + int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; @@ -71,7 +77,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { return TSDB_CODE_SUCCESS; } - +\ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t code = 0; bool qcontinue = true; @@ -88,6 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { // if *taskHandle is NULL, it's killed right now if (taskHandle) { qwDbgSimulateSleep(); + code = qExecTaskOpt(taskHandle, pResList, &useconds); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { @@ -150,8 +157,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } _return: - - taosArrayDestroy(pResList); + taosArrayDestroyEx(pResList, freeBlock); QW_RET(code); } @@ -915,13 +921,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { void *pIter = taosHashIterate(mgmt->schHash, NULL); while (pIter) { - SQWSchStatus *sch = (SQWSchStatus *)pIter; - if (NULL == sch->hbConnInfo.handle) { + SQWSchStatus *sch1 = (SQWSchStatus *)pIter; + if (NULL == sch1->hbConnInfo.handle) { uint64_t *sId = taosHashGetKey(pIter, NULL); QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId); - if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && - taosHashGetSize(sch->tasksHash) <= 0) { + if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && + taosHashGetSize(sch1->tasksHash) <= 0) { taosArrayPush(pExpiredSch, sId); }