提交 179709f8 编写于 作者: H Haojun Liao

fix(query): assign the subplan to belong to task struct

上级 79b0240a
...@@ -159,27 +159,28 @@ typedef struct { ...@@ -159,27 +159,28 @@ typedef struct {
int64_t recoverEndVer; int64_t recoverEndVer;
} SStreamTaskInfo; } SStreamTaskInfo;
typedef struct {
char* tablename;
char* dbname;
int32_t tversion;
SSchemaWrapper* sw;
} SSchemaInfo;
typedef struct SExecTaskInfo { typedef struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
uint32_t status; uint32_t status;
STimeWindow window; STimeWindow window;
STaskCostInfo cost; STaskCostInfo cost;
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
int32_t code; int32_t code;
SStreamTaskInfo streamInfo; SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
struct { STableListInfo tableqinfoList; // this is a table list
char* tablename; const char* sql; // query sql string
char* dbname; jmp_buf env; // jump to this position when error happens.
int32_t tversion; EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SSchemaWrapper* sw; SSubplan* pSubplan;
} schemaVer;
STableListInfo tableqinfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
} SExecTaskInfo; } SExecTaskInfo;
...@@ -248,13 +249,13 @@ typedef struct SLoadRemoteDataInfo { ...@@ -248,13 +249,13 @@ typedef struct SLoadRemoteDataInfo {
} SLoadRemoteDataInfo; } SLoadRemoteDataInfo;
typedef struct SLimitInfo { typedef struct SLimitInfo {
SLimit limit; SLimit limit;
SLimit slimit; SLimit slimit;
uint64_t currentGroupId; uint64_t currentGroupId;
int64_t remainGroupOffset; int64_t remainGroupOffset;
int64_t numOfOutputGroups; int64_t numOfOutputGroups;
int64_t remainOffset; int64_t remainOffset;
int64_t numOfOutputRows; int64_t numOfOutputRows;
} SLimitInfo; } SLimitInfo;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
......
...@@ -227,14 +227,14 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf ...@@ -227,14 +227,14 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i); SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i);
for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) { for (int32_t j = 0; j < pTaskInfo->schemaInfo.sw->nCols; ++j) {
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId && if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId &&
pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
(*pSlotIds)[pColMatch->targetSlotId] = -1; (*pSlotIds)[pColMatch->targetSlotId] = -1;
break; break;
} }
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) { if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId) {
(*pSlotIds)[pColMatch->targetSlotId] = j; (*pSlotIds)[pColMatch->targetSlotId] = j;
break; break;
} }
......
...@@ -138,7 +138,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -138,7 +138,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
} }
} }
nodesDestroyNode((SNode*)pPlan);
return pTaskInfo; return pTaskInfo;
} }
...@@ -165,7 +164,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { ...@@ -165,7 +164,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
return NULL; return NULL;
} }
nodesDestroyNode((SNode*)pPlan);
return pTaskInfo; return pTaskInfo;
} }
...@@ -243,19 +241,19 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table ...@@ -243,19 +241,19 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (pTaskInfo->schemaVer.sw == NULL) { if (pTaskInfo->schemaInfo.sw == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
*sversion = pTaskInfo->schemaVer.sw->version; *sversion = pTaskInfo->schemaInfo.sw->version;
*tversion = pTaskInfo->schemaVer.tversion; *tversion = pTaskInfo->schemaInfo.tversion;
if (pTaskInfo->schemaVer.dbname) { if (pTaskInfo->schemaInfo.dbname) {
strcpy(dbName, pTaskInfo->schemaVer.dbname); strcpy(dbName, pTaskInfo->schemaInfo.dbname);
} else { } else {
dbName[0] = 0; dbName[0] = 0;
} }
if (pTaskInfo->schemaVer.tablename) { if (pTaskInfo->schemaInfo.tablename) {
strcpy(tableName, pTaskInfo->schemaVer.tablename); strcpy(tableName, pTaskInfo->schemaInfo.tablename);
} else { } else {
tableName[0] = 0; tableName[0] = 0;
} }
......
...@@ -4127,7 +4127,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -4127,7 +4127,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->schemaVer.dbname = strdup(dbFName); pTaskInfo->schemaInfo.dbname = strdup(dbFName);
pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->id.queryId = queryId; pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model; pTaskInfo->execModel = model;
...@@ -4153,35 +4153,35 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo ...@@ -4153,35 +4153,35 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
return terrno; return terrno;
} }
pTaskInfo->schemaVer.tablename = strdup(mr.me.name); pTaskInfo->schemaInfo.tablename = strdup(mr.me.name);
if (mr.me.type == TSDB_SUPER_TABLE) { if (mr.me.type == TSDB_SUPER_TABLE) {
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
} else if (mr.me.type == TSDB_CHILD_TABLE) { } else if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder); tDecoderClear(&mr.coder);
tb_uid_t suid = mr.me.ctbEntry.suid; tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid); metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
} else { } else {
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
} }
metaReaderClear(&mr); metaReaderClear(&mr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) { static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
taosMemoryFreeClear(pTaskInfo->schemaVer.dbname); taosMemoryFreeClear(pSchemaInfo->dbname);
if (pTaskInfo->schemaVer.sw == NULL) { if (pSchemaInfo->sw == NULL) {
return; return;
} }
taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema); taosMemoryFree(pSchemaInfo->tablename);
taosMemoryFree(pTaskInfo->schemaVer.sw); taosMemoryFree(pSchemaInfo->sw->pSchema);
taosMemoryFree(pTaskInfo->schemaVer.tablename); taosMemoryFree(pSchemaInfo->sw);
} }
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) { static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
...@@ -4939,6 +4939,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4939,6 +4939,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
} }
(*pTaskInfo)->sql = sql; (*pTaskInfo)->sql = sql;
(*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
...@@ -4977,7 +4978,9 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { ...@@ -4977,7 +4978,9 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
doDestroyTableList(&pTaskInfo->tableqinfoList); doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot); destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(pTaskInfo); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo->id.str);
......
...@@ -150,7 +150,6 @@ typedef struct SQWTaskCtx { ...@@ -150,7 +150,6 @@ typedef struct SQWTaskCtx {
void *taskHandle; void *taskHandle;
void *sinkHandle; void *sinkHandle;
SSubplan *plan;
STbVerInfo tbInfo; STbVerInfo tbInfo;
} SQWTaskCtx; } SQWTaskCtx;
......
...@@ -306,11 +306,6 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { ...@@ -306,11 +306,6 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
dsDestroyDataSinker(ctx->sinkHandle); dsDestroyDataSinker(ctx->sinkHandle);
ctx->sinkHandle = NULL; ctx->sinkHandle = NULL;
} }
if (ctx->plan) {
nodesDestroyNode((SNode*)ctx->plan);
ctx->plan = NULL;
}
} }
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
...@@ -327,7 +322,6 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { ...@@ -327,7 +322,6 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->taskHandle, NULL);
atomic_store_ptr(&ctx->sinkHandle, NULL); atomic_store_ptr(&ctx->sinkHandle, NULL);
atomic_store_ptr(&ctx->plan, NULL);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
......
...@@ -522,8 +522,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { ...@@ -522,8 +522,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
ctx->plan = plan;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
...@@ -928,8 +926,6 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { ...@@ -928,8 +926,6 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
ctx.plan = plan;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册