From 179709f814856661fdf883775a1ca2055c3d6abb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Jul 2022 17:37:40 +0800 Subject: [PATCH] fix(query): assign the subplan to belong to task struct --- source/libs/executor/inc/executorimpl.h | 55 ++++++++++---------- source/libs/executor/src/cachescanoperator.c | 6 +-- source/libs/executor/src/executor.c | 16 +++--- source/libs/executor/src/executorimpl.c | 31 ++++++----- source/libs/qworker/inc/qwInt.h | 1 - source/libs/qworker/src/qwUtil.c | 6 --- source/libs/qworker/src/qworker.c | 4 -- 7 files changed, 55 insertions(+), 64 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a7e033e547..d7c283c70d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -159,27 +159,28 @@ typedef struct { int64_t recoverEndVer; } SStreamTaskInfo; +typedef struct { + char* tablename; + char* dbname; + int32_t tversion; + SSchemaWrapper* sw; +} SSchemaInfo; + typedef struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution - int32_t code; - - SStreamTaskInfo streamInfo; - - struct { - char* tablename; - char* dbname; - int32_t tversion; - SSchemaWrapper* sw; - } schemaVer; - - STableListInfo tableqinfoList; // this is a table list - const char* sql; // query sql string - jmp_buf env; // jump to this position when error happens. - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; + + SStreamTaskInfo streamInfo; + SSchemaInfo schemaInfo; + STableListInfo tableqinfoList; // this is a table list + const char* sql; // query sql string + jmp_buf env; // jump to this position when error happens. + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SSubplan* pSubplan; struct SOperatorInfo* pRoot; } SExecTaskInfo; @@ -248,13 +249,13 @@ typedef struct SLoadRemoteDataInfo { } SLoadRemoteDataInfo; typedef struct SLimitInfo { - SLimit limit; - SLimit slimit; - uint64_t currentGroupId; - int64_t remainGroupOffset; - int64_t numOfOutputGroups; - int64_t remainOffset; - int64_t numOfOutputRows; + SLimit limit; + SLimit slimit; + uint64_t currentGroupId; + int64_t remainGroupOffset; + int64_t numOfOutputGroups; + int64_t remainOffset; + int64_t numOfOutputRows; } SLimitInfo; typedef struct SExchangeInfo { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 395b42aa86..c46485a332 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -227,14 +227,14 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf for (int32_t i = 0; i < numOfCols; ++i) { SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i); - for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) { - if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId && + for (int32_t j = 0; j < pTaskInfo->schemaInfo.sw->nCols; ++j) { + if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { (*pSlotIds)[pColMatch->targetSlotId] = -1; break; } - if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) { + if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId) { (*pSlotIds)[pColMatch->targetSlotId] = j; break; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a2022e2ef2..0bf294f33d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -138,7 +138,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n } } - nodesDestroyNode((SNode*)pPlan); return pTaskInfo; } @@ -165,7 +164,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { return NULL; } - nodesDestroyNode((SNode*)pPlan); return pTaskInfo; } @@ -243,19 +241,19 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - if (pTaskInfo->schemaVer.sw == NULL) { + if (pTaskInfo->schemaInfo.sw == NULL) { return TSDB_CODE_SUCCESS; } - *sversion = pTaskInfo->schemaVer.sw->version; - *tversion = pTaskInfo->schemaVer.tversion; - if (pTaskInfo->schemaVer.dbname) { - strcpy(dbName, pTaskInfo->schemaVer.dbname); + *sversion = pTaskInfo->schemaInfo.sw->version; + *tversion = pTaskInfo->schemaInfo.tversion; + if (pTaskInfo->schemaInfo.dbname) { + strcpy(dbName, pTaskInfo->schemaInfo.dbname); } else { dbName[0] = 0; } - if (pTaskInfo->schemaVer.tablename) { - strcpy(tableName, pTaskInfo->schemaVer.tablename); + if (pTaskInfo->schemaInfo.tablename) { + strcpy(tableName, pTaskInfo->schemaInfo.tablename); } else { tableName[0] = 0; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 06bb096e59..b04916e008 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4127,7 +4127,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTaskInfo->schemaVer.dbname = strdup(dbFName); + pTaskInfo->schemaInfo.dbname = strdup(dbFName); pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; @@ -4153,35 +4153,35 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo return terrno; } - pTaskInfo->schemaVer.tablename = strdup(mr.me.name); + pTaskInfo->schemaInfo.tablename = strdup(mr.me.name); if (mr.me.type == TSDB_SUPER_TABLE) { - pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; + pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; } else if (mr.me.type == TSDB_CHILD_TABLE) { tDecoderClear(&mr.coder); tb_uid_t suid = mr.me.ctbEntry.suid; metaGetTableEntryByUid(&mr, suid); - pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; + pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; } else { - pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); + pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } metaReaderClear(&mr); return TSDB_CODE_SUCCESS; } -static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) { - taosMemoryFreeClear(pTaskInfo->schemaVer.dbname); - if (pTaskInfo->schemaVer.sw == NULL) { +static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { + taosMemoryFreeClear(pSchemaInfo->dbname); + if (pSchemaInfo->sw == NULL) { return; } - taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema); - taosMemoryFree(pTaskInfo->schemaVer.sw); - taosMemoryFree(pTaskInfo->schemaVer.tablename); + taosMemoryFree(pSchemaInfo->tablename); + taosMemoryFree(pSchemaInfo->sw->pSchema); + taosMemoryFree(pSchemaInfo->sw); } static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) { @@ -4939,6 +4939,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } (*pTaskInfo)->sql = sql; + (*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); if (NULL == (*pTaskInfo)->pRoot) { @@ -4977,7 +4978,9 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { doDestroyTableList(&pTaskInfo->tableqinfoList); destroyOperatorInfo(pTaskInfo->pRoot); - cleanupTableSchemaInfo(pTaskInfo); + cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); + + nodesDestroyNode((SNode*)pTaskInfo->pSubplan); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index c8e5204e91..8f036714c9 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -150,7 +150,6 @@ typedef struct SQWTaskCtx { void *taskHandle; void *sinkHandle; - SSubplan *plan; STbVerInfo tbInfo; } SQWTaskCtx; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index b56cb29628..c7bf7ab7e7 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -306,11 +306,6 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { dsDestroyDataSinker(ctx->sinkHandle); ctx->sinkHandle = NULL; } - - if (ctx->plan) { - nodesDestroyNode((SNode*)ctx->plan); - ctx->plan = NULL; - } } int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { @@ -327,7 +322,6 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->sinkHandle, NULL); - atomic_store_ptr(&ctx->plan, NULL); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index e99695e962..ebccb7950c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -522,8 +522,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { QW_ERR_JRET(code); } - ctx->plan = plan; - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -928,8 +926,6 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { QW_ERR_JRET(code); } - ctx.plan = plan; - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); -- GitLab