diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1e7847b4e89d402e2fc99935615243dcd78bc5cb..6fcd6e770907bda3e45e5a4bf068fc53a02b6e92 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1605,6 +1605,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc // TDMT_VND_DROP_TABLE ================= typedef struct { const char* name; + int8_t igNotExists; } SVDropTbReq; typedef struct { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 84390e14c48c0c726895be6f6f7b7fd5b9188a2e..ebb7391a2ba1a6dc20f510bbe7c5479d3ad04689 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -616,6 +616,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637) #define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638) #define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639) +#define TSDB_CODE_PAR_INVALID_DROP_STABLE TAOS_DEF_ERROR_CODE(0, 0x263A) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7abe0f29c12365986d2835a4d79b2c54dfc0e1d0..578a63887f02b669357521b43248c0264dda0937 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3800,6 +3800,7 @@ static int32_t tEncodeSVDropTbReq(SCoder *pCoder, const SVDropTbReq *pReq) { if (tStartEncode(pCoder) < 0) return -1; if (tEncodeCStr(pCoder, pReq->name) < 0) return -1; + if (tEncodeI8(pCoder, pReq->igNotExists) < 0) return -1; tEndEncode(pCoder); return 0; @@ -3809,6 +3810,7 @@ static int32_t tDecodeSVDropTbReq(SCoder *pCoder, SVDropTbReq *pReq) { if (tStartDecode(pCoder) < 0) return -1; if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->igNotExists) < 0) return -1; tEndDecode(pCoder); return 0; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5f7dc5de9015a2b9aad7a54688afc04e38e0bcda..6682617f1cf53ed00f131a010cf7670b7878c2ef 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3134,11 +3134,11 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) { return code; } -typedef struct SVgroupTablesBatch { +typedef struct SVgroupCreateTableBatch { SVCreateTbBatchReq req; SVgroupInfo info; char dbName[TSDB_DB_NAME_LEN]; -} SVgroupTablesBatch; +} SVgroupCreateTableBatch; static void destroyCreateTbReq(SVCreateTbReq* pReq) { taosMemoryFreeClear(pReq->name); @@ -3146,7 +3146,7 @@ static void destroyCreateTbReq(SVCreateTbReq* pReq) { } static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo, - SVgroupTablesBatch* pBatch) { + SVgroupCreateTableBatch* pBatch) { char dbFName[TSDB_DB_FNAME_LEN] = {0}; SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId}; strcpy(name.dbname, pStmt->dbName); @@ -3180,13 +3180,13 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* return TSDB_CODE_SUCCESS; } -static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { +static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch, SArray* pBufArray) { int tlen; SCoder coder = {0}; int32_t ret = 0; tEncodeSize(tEncodeSVCreateTbBatchReq, &pTbBatch->req, tlen, ret); - tlen += sizeof(SMsgHead); //+ tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req)); + tlen += sizeof(SMsgHead); void* buf = taosMemoryMalloc(tlen); if (NULL == buf) { return TSDB_CODE_OUT_OF_MEMORY; @@ -3212,7 +3212,7 @@ static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* return TSDB_CODE_SUCCESS; } -static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) { +static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) { size_t size = taosArrayGetSize(pTbBatch->req.pArray); for (int32_t i = 0; i < size; ++i) { SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i); @@ -3257,10 +3257,10 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt* return TSDB_CODE_OUT_OF_MEMORY; } - SVgroupTablesBatch tbatch = {0}; - int32_t code = buildNormalTableBatchReq(acctId, pStmt, pInfo, &tbatch); + SVgroupCreateTableBatch tbatch = {0}; + int32_t code = buildNormalTableBatchReq(acctId, pStmt, pInfo, &tbatch); if (TSDB_CODE_SUCCESS == code) { - code = serializeVgroupTablesBatch(&tbatch, *pBufArray); + code = serializeVgroupCreateTableBatch(&tbatch, *pBufArray); } destroyCreateTbReqBatch(&tbatch); @@ -3305,9 +3305,9 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c req.ctb.suid = suid; req.ctb.pTag = row; - SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); + SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); if (pTableBatch == NULL) { - SVgroupTablesBatch tBatch = {0}; + SVgroupCreateTableBatch tBatch = {0}; tBatch.info = *pVgInfo; strcpy(tBatch.dbName, pDbName); @@ -3480,21 +3480,21 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla return code; } -static SArray* serializeVgroupsTablesBatch(int32_t acctId, SHashObj* pVgroupHashmap) { +static SArray* serializeVgroupsCreateTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) { SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); if (NULL == pBufArray) { return NULL; } - int32_t code = TSDB_CODE_SUCCESS; - SVgroupTablesBatch* pTbBatch = NULL; + int32_t code = TSDB_CODE_SUCCESS; + SVgroupCreateTableBatch* pTbBatch = NULL; do { pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch); if (pTbBatch == NULL) { break; } - serializeVgroupTablesBatch(pTbBatch, pBufArray); + serializeVgroupCreateTableBatch(pTbBatch, pBufArray); destroyCreateTbReqBatch(pTbBatch); } while (true); @@ -3519,7 +3519,143 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) } } - SArray* pBufArray = serializeVgroupsTablesBatch(pCxt->pParseCxt->acctId, pVgroupHashmap); + SArray* pBufArray = serializeVgroupsCreateTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap); + taosHashCleanup(pVgroupHashmap); + if (NULL == pBufArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return rewriteToVnodeModifOpStmt(pQuery, pBufArray); +} + +typedef struct SVgroupDropTableBatch { + SVDropTbBatchReq req; + SVgroupInfo info; + char dbName[TSDB_DB_NAME_LEN]; +} SVgroupDropTableBatch; + +static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo) { + SVDropTbReq req = {.name = pClause->tableName, .igNotExists = pClause->ignoreNotExists}; + SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); + if (NULL == pTableBatch) { + SVgroupDropTableBatch tBatch = {0}; + tBatch.info = *pVgInfo; + tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq)); + taosArrayPush(tBatch.req.pArray, &req); + + taosHashPut(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &tBatch, sizeof(tBatch)); + } else { // add to the correct vgroup + taosArrayPush(pTableBatch->req.pArray, &req); + } +} + +static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableClause* pClause, bool* pIsSuperTable, + SHashObj* pVgroupHashmap) { + STableMeta* pTableMeta = NULL; + int32_t code = getTableMeta(pCxt, pClause->dbName, pClause->tableName, &pTableMeta); + + if (TSDB_CODE_SUCCESS == code && TSDB_SUPER_TABLE == pTableMeta->tableType) { + *pIsSuperTable = true; + goto over; + } + + *pIsSuperTable = false; + + SVgroupInfo info = {0}; + if (TSDB_CODE_SUCCESS == code) { + code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info); + } + if (TSDB_CODE_SUCCESS == code) { + addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info); + } + +over: + taosMemoryFreeClear(pTableMeta); + return code; +} + +static void destroyDropTbReqBatch(SVgroupDropTableBatch* pTbBatch) { taosArrayDestroy(pTbBatch->req.pArray); } + +static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SArray* pBufArray) { + int tlen; + SCoder coder = {0}; + + int32_t ret = 0; + tEncodeSize(tEncodeSVDropTbBatchReq, &pTbBatch->req, tlen, ret); + tlen += sizeof(SMsgHead); + void* buf = taosMemoryMalloc(tlen); + if (NULL == buf) { + return TSDB_CODE_OUT_OF_MEMORY; + } + ((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId); + ((SMsgHead*)buf)->contLen = htonl(tlen); + void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, tlen - sizeof(SMsgHead), TD_ENCODER); + tEncodeSVDropTbBatchReq(&coder, &pTbBatch->req); + tCoderClear(&coder); + + SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == pVgData) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pVgData->vg = pTbBatch->info; + pVgData->pData = buf; + pVgData->size = tlen; + pVgData->numOfTables = (int32_t)taosArrayGetSize(pTbBatch->req.pArray); + taosArrayPush(pBufArray, &pVgData); + + return TSDB_CODE_SUCCESS; +} + +static SArray* serializeVgroupsDropTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) { + SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); + if (NULL == pBufArray) { + return NULL; + } + + int32_t code = TSDB_CODE_SUCCESS; + SVgroupDropTableBatch* pTbBatch = NULL; + do { + pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch); + if (pTbBatch == NULL) { + break; + } + + serializeVgroupDropTableBatch(pTbBatch, pBufArray); + destroyDropTbReqBatch(pTbBatch); + } while (true); + + return pBufArray; +} + +static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { + SDropTableStmt* pStmt = (SDropTableStmt*)pQuery->pRoot; + + SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == pVgroupHashmap) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + bool isSuperTable = false; + SNode* pNode; + FOREACH(pNode, pStmt->pTables) { + int32_t code = buildDropTableVgroupHashmap(pCxt, (SDropTableClause*)pNode, &isSuperTable, pVgroupHashmap); + if (TSDB_CODE_SUCCESS != code) { + taosHashCleanup(pVgroupHashmap); + return code; + } + if (isSuperTable && LIST_LENGTH(pStmt->pTables) > 1) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DROP_STABLE); + } + } + + if (isSuperTable) { + taosHashCleanup(pVgroupHashmap); + return TSDB_CODE_SUCCESS; + } + + SArray* pBufArray = serializeVgroupsDropTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap); taosHashCleanup(pVgroupHashmap); if (NULL == pBufArray) { return TSDB_CODE_OUT_OF_MEMORY; @@ -3565,6 +3701,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { case QUERY_NODE_CREATE_MULTI_TABLE_STMT: code = rewriteCreateMultiTable(pCxt, pQuery); break; + case QUERY_NODE_DROP_TABLE_STMT: + code = rewriteDropTable(pCxt, pQuery); + break; case QUERY_NODE_ALTER_TABLE_STMT: if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == ((SAlterTableStmt*)pQuery->pRoot)->alterType) { code = rewriteAlterTable(pCxt, pQuery); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index d75328300899d4d0bf9d817a0717a6cdabced70e..2bf4071d3395d8bf29e9a3b9aedde76ec595b072 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -126,6 +126,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "slimit/soffset only available for PARTITION BY query"; case TSDB_CODE_PAR_INVALID_TOPIC_QUERY: return "Invalid topic query"; + case TSDB_CODE_PAR_INVALID_DROP_STABLE: + return "Cannot drop super table in batch"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 18e59859accdcc9cb2efc0df20842ef43a5844f5..634c146d4b62dac48c7cd8c916fc4571e856ede9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -944,9 +944,16 @@ static int32_t createSetOperatorLogicNode(SLogicPlanContext* pCxt, SSetOperator* } static int32_t getMsgType(ENodeType sqlType) { - return (QUERY_NODE_CREATE_TABLE_STMT == sqlType || QUERY_NODE_CREATE_MULTI_TABLE_STMT == sqlType) - ? TDMT_VND_CREATE_TABLE - : TDMT_VND_SUBMIT; + switch (sqlType) { + case QUERY_NODE_CREATE_TABLE_STMT: + case QUERY_NODE_CREATE_MULTI_TABLE_STMT: + return TDMT_VND_CREATE_TABLE; + case QUERY_NODE_DROP_TABLE_STMT: + return TDMT_VND_DROP_TABLE; + default: + break; + } + return TDMT_VND_SUBMIT; } static int32_t createVnodeModifLogicNode(SLogicPlanContext* pCxt, SVnodeModifOpStmt* pStmt, SLogicNode** pLogicNode) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 31d2da93802e18ed00306b909daba44665cc622e..a87ec20f0c1729fa9ebbefe7bed6caa323f03fab 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -245,6 +245,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; case TDMT_VND_CREATE_TABLE_RSP: + case TDMT_VND_DROP_TABLE_RSP: case TDMT_VND_SUBMIT_RSP: break; default: @@ -369,7 +370,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } for (int32_t n = 0; n < childNum; ++n) { - SSubplan * child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n); + SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n); SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES); if (NULL == childTask || NULL == *childTask) { SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); @@ -401,7 +402,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } for (int32_t n = 0; n < parentNum; ++n) { - SSubplan * parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n); + SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n); SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); if (NULL == parentTask || NULL == *parentTask) { SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); @@ -491,7 +492,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SSchLevel level = {0}; SNodeListNode *plans = NULL; int32_t taskNum = 0; - SSchLevel * pLevel = NULL; + SSchLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; @@ -1094,6 +1095,30 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; } + case TDMT_VND_DROP_TABLE_RSP: { + SVDropTbBatchRsp batchRsp = {0}; + if (msg) { + SCoder coder = {0}; + tCoderInit(&coder, TD_LITTLE_ENDIAN, msg, msgSize, TD_DECODER); + code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp); + if (TSDB_CODE_SUCCESS == code && batchRsp.pArray) { + int32_t num = taosArrayGetSize(batchRsp.pArray); + for (int32_t i = 0; i < num; ++i) { + SVDropTbRsp *rsp = taosArrayGet(batchRsp.pArray, i); + if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { + tCoderClear(&coder); + SCH_ERR_JRET(rsp->code); + } + } + } + tCoderClear(&coder); + SCH_ERR_JRET(code); + } + + SCH_ERR_JRET(rspCode); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + break; + } case TDMT_VND_SUBMIT_RSP: { if (msg) { SSubmitRsp *rsp = (SSubmitRsp *)msg; @@ -1267,7 +1292,7 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; - SSchTask * pTask = NULL; + SSchTask *pTask = NULL; SSchJob *pJob = schAcquireJob(pParam->refId); if (NULL == pJob) { @@ -1316,6 +1341,10 @@ int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code); } +int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code); +} + int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); } @@ -1412,6 +1441,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { case TDMT_VND_CREATE_TABLE: *fp = schHandleCreateTableCallback; break; + case TDMT_VND_DROP_TABLE: + *fp = schHandleDropTableCallback; + break; case TDMT_VND_SUBMIT: *fp = schHandleSubmitCallback; break; @@ -1617,8 +1649,8 @@ _return: int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { int32_t code = 0; SSchHbCallbackParam *param = NULL; - SMsgSendInfo * pMsgSendInfo = NULL; - SQueryNodeAddr * addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SMsgSendInfo *pMsgSendInfo = NULL; + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeEpId epId = {0}; epId.nodeId = addr->nodeId; @@ -1759,10 +1791,10 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) { } SRpcCtxVal dst = {0}; - void * pIter = taosHashIterate(pSrc->args, NULL); + void *pIter = taosHashIterate(pSrc->args, NULL); while (pIter) { SRpcCtxVal *pVal = (SRpcCtxVal *)pIter; - int32_t * msgType = taosHashGetKey(pIter, NULL); + int32_t *msgType = taosHashGetKey(pIter, NULL); dst = *pVal; dst.val = NULL; @@ -1916,7 +1948,7 @@ _return: int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) { uint32_t msgSize = 0; - void * msg = NULL; + void *msg = NULL; int32_t code = 0; bool isCandidateAddr = false; bool persistHandle = false; @@ -1931,6 +1963,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, switch (msgType) { case TDMT_VND_CREATE_TABLE: + case TDMT_VND_DROP_TABLE: case TDMT_VND_SUBMIT: { msgSize = pTask->msgLen; msg = taosMemoryCalloc(1, msgSize); @@ -2673,7 +2706,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); for (int32_t m = 0; m < pLevel->taskNum; ++m) { - SSchTask * pTask = taosArrayGet(pLevel->subTasks, m); + SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status}; taosArrayPush(pSub, &subDesc);