diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 045c92b5860dd7719caf6e6f2fecee0ecdc6e10e..3b26f8b9efaf1cc1c17806729dc1d777e91186d4 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -311,6 +311,14 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName if (NULL == vgInfo) { ctgError("no hash range found for hashvalue[%u]", hashValue); + + void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL); + while (pIter1) { + vgInfo = pIter1; + ctgError("valid range:[%d, %d], vgId:%d", vgInfo->hashBegin, vgInfo->hashEnd, vgInfo->vgId); + pIter1 = taosHashIterate(dbInfo->vgInfo, pIter1); + } + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -773,7 +781,6 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup)); _return: - if (dbInfo) { CTG_UNLOCK(CTG_READ, &dbInfo->lock); taosHashRelease(pCatalog->dbCache.cache, dbInfo); diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index d48041fbdbb2792d06d3c63a5750bd985fbdc1cb..e944fb01bdf5615cc4c04b5cee4bf1e937d76142 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -339,7 +339,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { - tdDestroyKVRowBuilder(pKvRowBuilder); return buildInvalidOperationMsg(pMsgBuf, msg1); } } @@ -393,6 +392,9 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa const char* msg3 = "tag value too long"; const char* msg4 = "illegal value or data overflow"; + int32_t code = 0; + STableMeta* pSuperTableMeta = NULL; + SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); // super table name, create table by using dst @@ -401,29 +403,30 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j); SToken* pSTableNameToken = &pCreateTableInfo->stbName; - int32_t code = parserValidateNameToken(pSTableNameToken); + code = parserValidateNameToken(pSTableNameToken); if (code != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } SName name = {0}; code = createSName(&name, pSTableNameToken, pCtx, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _error; } SKVRowBuilder kvRowBuilder = {0}; if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; } SArray* pValList = pCreateTableInfo->pTagVals; size_t numOfInputTag = taosArrayGetSize(pValList); - STableMeta* pSuperTableMeta = NULL; code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _error; } assert(pSuperTableMeta != NULL); @@ -442,8 +445,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa if (numOfInputTag != numOfBoundTags || schemaSize < numOfInputTag) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } bool findColumnIndex = false; @@ -475,8 +478,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { if (pItem->pVar.nLen > pSchema->bytes) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } } else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) { @@ -492,19 +495,19 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa code = taosVariantDump(&(pItem->pVar), tagVal, pSchema->type, true); // check again after the convert since it may be converted from binary to nchar. - if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(pSchema->type)) { int16_t len = varDataTLen(tagVal); if (len > pSchema->bytes) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } } if (code != TSDB_CODE_SUCCESS) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg4); + code = buildInvalidOperationMsg(pMsgBuf, msg4); + goto _error; } tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); @@ -522,23 +525,22 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa } else { if (schemaSize != numOfInputTag) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return code; + goto _error; } } SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder); if (row == NULL) { - tfree(pSuperTableMeta); - return TSDB_CODE_QRY_OUT_OF_MEMORY; + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; } tdSortKVRowByColIdx(row); @@ -546,22 +548,34 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa SName tableName = {0}; code = createSName(&tableName, &pCreateTableInfo->name, pCtx, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { - tfree(pSuperTableMeta); - return code; + goto _error; } // Find a appropriate vgroup to accommodate this table , according to the table name SVgroupInfo info = {0}; - catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + code = catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } addCreateTbReqIntoVgroup(pVgroupHashmap, &tableName, row, pSuperTableMeta->uid, &info); tfree(pSuperTableMeta); } *pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap); + if (*pBufArray == NULL) { + code = terrno; + goto _error; + } taosHashCleanup(pVgroupHashmap); return TSDB_CODE_SUCCESS; + + _error: + taosHashCleanup(pVgroupHashmap); + tfree(pSuperTableMeta); + terrno = code; + return code; } static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 779a2646996e1614701c48e3e11aa861687903eb..a7ec39bfdef719627eb89ec7b14c1a69533c9f58 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -67,8 +67,8 @@ typedef struct SSchTask { int32_t msgLen; // msg length int8_t status; // task status SQueryNodeAddr execAddr; // task actual executed node address - int8_t condidateIdx; // current try condidation index - SArray *condidateAddrs; // condidate node addresses, element is SQueryNodeAddr + int8_t candidateIdx; // current try condidation index + SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr SQueryProfileSummary summary; // task execution summary int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 69df5dddeea9cb795658d969ece414700082597a..38a823de2ccbf848787940ea4c3fadf1fe673e81 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -109,7 +109,7 @@ static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) { } static void cleanupTask(SSchTask* pTask) { - taosArrayDestroy(pTask->condidateAddrs); + taosArrayDestroy(pTask->candidateAddrs); } int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) { @@ -226,20 +226,20 @@ _return: SCH_RET(code); } -int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { - if (task->condidateAddrs) { +int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { + if (task->candidateAddrs) { return TSDB_CODE_SUCCESS; } - task->condidateIdx = 0; - task->condidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); - if (NULL == task->condidateAddrs) { + task->candidateIdx = 0; + task->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + if (NULL == task->candidateAddrs) { qError("taosArrayInit failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } if (task->plan->execNode.numOfEps > 0) { - if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -253,7 +253,7 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); - if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -798,7 +798,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SEpSet epSet; - SQueryNodeAddr *addr = taosArrayGet(task->condidateAddrs, task->condidateIdx); + SQueryNodeAddr *addr = taosArrayGet(task->candidateAddrs, task->candidateIdx); schConvertAddrToEpSet(addr, &epSet); @@ -816,9 +816,9 @@ _return: int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); - SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); + SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); - if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { + if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) { SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); }