提交 04cd42a8 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/feature/3_liaohj' into feature/3_liaohj

...@@ -480,6 +480,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { ...@@ -480,6 +480,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
pTaskInfo->code = ret; pTaskInfo->code = ret;
cleanUpUdfs(); cleanUpUdfs();
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
atomic_store_64(&pTaskInfo->owner, 0); atomic_store_64(&pTaskInfo->owner, 0);
...@@ -512,8 +513,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { ...@@ -512,8 +513,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
} }
cleanUpUdfs(); cleanUpUdfs();
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0); GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "tcommon.h" #include "tcommon.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
#include "tdatablock.h"
SQWorkerMgmt gQwMgmt = { SQWorkerMgmt gQwMgmt = {
.lock = 0, .lock = 0,
...@@ -16,6 +17,11 @@ SQWorkerMgmt gQwMgmt = { ...@@ -16,6 +17,11 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0, .qwNum = 0,
}; };
static void freeBlock(void* param) {
SSDataBlock* pBlock = *(SSDataBlock**)param;
blockDataDestroy(pBlock);
}
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
...@@ -71,7 +77,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -71,7 +77,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t code = 0; int32_t code = 0;
bool qcontinue = true; bool qcontinue = true;
...@@ -88,6 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -88,6 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
// if *taskHandle is NULL, it's killed right now // if *taskHandle is NULL, it's killed right now
if (taskHandle) { if (taskHandle) {
qwDbgSimulateSleep(); qwDbgSimulateSleep();
code = qExecTaskOpt(taskHandle, pResList, &useconds); code = qExecTaskOpt(taskHandle, pResList, &useconds);
if (code) { if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
...@@ -150,8 +157,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -150,8 +157,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
} }
_return: _return:
taosArrayDestroyEx(pResList, freeBlock);
taosArrayDestroy(pResList);
QW_RET(code); QW_RET(code);
} }
...@@ -915,13 +921,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { ...@@ -915,13 +921,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
void *pIter = taosHashIterate(mgmt->schHash, NULL); void *pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) { while (pIter) {
SQWSchStatus *sch = (SQWSchStatus *)pIter; SQWSchStatus *sch1 = (SQWSchStatus *)pIter;
if (NULL == sch->hbConnInfo.handle) { if (NULL == sch1->hbConnInfo.handle) {
uint64_t *sId = taosHashGetKey(pIter, NULL); uint64_t *sId = taosHashGetKey(pIter, NULL);
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId); QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
taosHashGetSize(sch->tasksHash) <= 0) { taosHashGetSize(sch1->tasksHash) <= 0) {
taosArrayPush(pExpiredSch, sId); taosArrayPush(pExpiredSch, sId);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册