diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 48b1138072004926d9bd5dafc2cb89b6c89d6945..b09d34cd466b2eab42b8dfc298e92540bec76eba 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -166,7 +166,7 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); void tscFieldInfoClear(SFieldInfo* pFieldInfo); -void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc); +void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList); static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; } @@ -192,11 +192,14 @@ SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t function size_t tscNumOfExprs(SQueryInfo* pQueryInfo); SExprInfo *tscExprGet(SQueryInfo* pQueryInfo, int32_t index); int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); +int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy); void tscExprAssign(SExprInfo* dst, const SExprInfo* src); void tscExprDestroy(SArray* pExprInfo); int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num); +void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta); + SColumn* tscColumnClone(const SColumn* src); bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid); SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 20a2164149d3a4faa43cd7b5ce0b728d90b33d10..72432f602cfd81e02770dbad6570c9834db94105 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -226,7 +226,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { SSchedMsg schedMsg = {0}; schedMsg.fp = doRetrieveSubqueryData; - schedMsg.ahandle = (void *)pSql->self; + schedMsg.ahandle = (void *)pSql; schedMsg.thandle = (void *)1; schedMsg.msg = 0; taosScheduleTask(tscQhandle, &schedMsg); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index e1bcf3a7922bba763f2e27a4983664fcb138d963..77a4c7fb464652d8b92b08e4877b6601afb8b529 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -974,7 +974,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { SSDataBlock* pBlock = NULL; if (pInfo->currentGroupOffset == 0) { - pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -982,7 +982,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { while ((*newgroup) == false) { // ignore the remain blocks - pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -994,7 +994,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { return pBlock; } - pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -1008,7 +1008,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { } while ((*newgroup) == false) { - pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 195c310be59c3f6f9cb9c1ae3009f03207e8b455..3866db6c3545178087ae1f9dd5cc7a6887bf0938 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -803,7 +803,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC } STableMetaInfo *pSTableMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX); - code = tscSetTableFullName(&pTableMetaInfo->name, &sToken, pSql); + code = tscSetTableFullName(&pSTableMetaInfo->name, &sToken, pSql); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 212ccc9747dc0ffae2393f3bea2acc7d7b50e1e2..aeddbda1d6da2e2e663af7c59aab9da912374ee8 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -7149,6 +7149,9 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { int32_t size = taosArrayGetSize(pInfo->list); for (int32_t i = 0; i < size; ++i) { SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i); + if (pSqlNode->from == NULL) { + goto _end; + } // load the table meta in the from clause if (pSqlNode->from->type == SQL_NODE_FROM_TABLELIST) { @@ -7330,6 +7333,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0); SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo)); + printf("-------------queryinfo:%p\n", pSub); tscInitQueryInfo(pSub); int32_t code = validateSqlNode(pSql, p, pSub); @@ -7408,6 +7412,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf } if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { + clearAllTableMetaInfo(pQueryInfo, false); pQueryInfo->numOfTables = 0; // parse the subquery in the first place diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index eb6189406194dd0a91e2bbe752f2a35a19670f9d..dbe5c8c8fd9293c7a96e176bf296e9b6b1e1228b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1997,6 +1997,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { rsp += sizeof(SMultiTableMeta); SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param); + if(pParentSql == NULL) { + return pSql->res.code; + } + SSqlCmd *pParentCmd = &pParentSql->cmd; SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); @@ -2006,12 +2010,16 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg; int32_t code = tableMetaMsgConvert(pMetaMsg); if (code != TSDB_CODE_SUCCESS) { + taosHashCleanup(pSet); + taosReleaseRef(tscObjRef, pParentSql->self); return code; } STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg); if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) { tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname); + taosHashCleanup(pSet); + taosReleaseRef(tscObjRef, pParentSql->self); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -2062,6 +2070,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { tscDebug("0x%"PRIx64" load multi-tableMeta resp from complete numOfTables:%d", pSql->self, pMultiMeta->numOfTables); taosHashCleanup(pSet); + taosReleaseRef(tscObjRef, pParentSql->self); return TSDB_CODE_SUCCESS; } @@ -2090,51 +2099,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { int32_t size = 0; pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self); - /* size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); - pInfo->vgroupList = calloc(1, vgroupsz); - assert(pInfo->vgroupList != NULL); - - pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups; - if (pInfo->vgroupList->numOfVgroups <= 0) { - tscDebug("0x%"PRIx64" empty vgroup info, no corresponding tables for stable", pSql->self); - } else { - for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { - // just init, no need to lock - SVgroupInfo *pVgroup = &pInfo->vgroupList->vgroups[j]; - - SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; - vmsg->vgId = htonl(vmsg->vgId); - vmsg->numOfEps = vmsg->numOfEps; - for (int32_t k = 0; k < vmsg->numOfEps; ++k) { - vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port); - } - - SNewVgroupInfo newVi = createNewVgroupInfo(vmsg); - pVgroup->numOfEps = newVi.numOfEps; - pVgroup->vgId = newVi.vgId; - for (int32_t k = 0; k < vmsg->numOfEps; ++k) { - pVgroup->epAddr[k].port = newVi.ep[k].port; - pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN); - } - - // check if current buffer contains the vgroup info. - // If not, add it - SNewVgroupInfo existVgroupInfo = {.inUse = -1}; - taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo)); - - if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) || - (existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it - taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi)); - tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap)); - } - } - } -*/ pMsg += size; } taosReleaseRef(tscObjRef, parent->self); - return pSql->res.code; } @@ -2378,8 +2346,6 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { tscSetResRawPtr(pRes, pQueryInfo); } -// handleDownstreamOperator(pRes, pQueryInfo); - if (pSql->pSubscription != NULL) { int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 0ca79bc702b8dfb6ac41c5d28ce2e88b2dea31ce..5df75d0cd284d8dadbaba700f1c1ecfa51c71d5a 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2894,7 +2894,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo pQueryInfo->limit.limit = -1; pQueryInfo->limit.offset = 0; - assert(/*pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 &&*/ trsupport->subqueryIndex < pSql->subState.numOfSub); + assert(trsupport->subqueryIndex < pSql->subState.numOfSub); // launch subquery for each vnode, so the subquery index equals to the vgroupIndex. STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); @@ -3577,17 +3577,9 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STable } } -// for (int32_t i = 0; i < numOfOutput; ++i) { -// SExprInfo* pExprInfo = &pExprs[i]; -// if (pExprInfo->pExpr != NULL) { -// tExprTreeDestroy(pExprInfo->pExpr, NULL); -// pExprInfo->pExpr = NULL; -// } -// } -// -// tfree(pExprs); - + // todo refactor: filter should not be applied here. createFilterInfo(pQueryAttr, 0); + pQueryAttr->numOfFilterCols = 0; SArray* pa = NULL; if (stage == MASTER_SCAN) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 1d18d7f1e2206e09c7b149cea2713664c34c9da8..871b105e9125c425f0bba3f05bd292a1e5b4a9c5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -31,7 +31,6 @@ #include "ttokendef.h" static void freeQueryInfoImpl(SQueryInfo* pQueryInfo); -static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta); static void tscStrToLower(char *str, int32_t n) { if (str == NULL || n <= 0) { return;} @@ -716,7 +715,6 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { return pBlock; } -static int32_t v = 0; SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -735,11 +733,6 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) { pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup); pStatus->index = 0; - if (i == 0 && pStatus->pBlock != NULL) { - v += pStatus->pBlock->info.rows; - printf("---------------%d\n", v); - } - if (pStatus->pBlock == NULL) { pOperator->status = OP_EXEC_DONE; @@ -901,6 +894,13 @@ SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t return pOptr; } +static void destroyJoinOperator(void* param, int32_t numOfOutput) { + SJoinOperatorInfo* pInfo = (SJoinOperatorInfo*) param; + tfree(pInfo->status); + + pInfo->pRes = destroyOutputBuf(pInfo->pRes); +} + SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) { SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo)); @@ -931,7 +931,7 @@ SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstre pOperator->blockingOptr = false; pOperator->info = pInfo; pOperator->exec = doDataBlockJoin; - pOperator->cleanup = destroyDummyInputOperator; + pOperator->cleanup = destroyJoinOperator; for(int32_t i = 0; i < numOfUpstream; ++i) { appendUpstream(pOperator, pUpstream[i]); @@ -1075,8 +1075,9 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { while(pQueryInfo != NULL) { SQueryInfo* p = pQueryInfo->sibling; - if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { - SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, 0); + int32_t numOfUpstream = taosArrayGetSize(pQueryInfo->pUpstream); + for(int32_t i = 0; i < numOfUpstream; ++i) { + SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, i); freeQueryInfoImpl(pUp); clearAllTableMetaInfo(pUp, removeMeta); @@ -1097,11 +1098,11 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { } tfree(pQueryInfo); - pQueryInfo = p; } pCmd->pQueryInfo = NULL; + pCmd->active = NULL; } void destroyTableNameList(SSqlCmd* pCmd) { @@ -1132,6 +1133,9 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) { pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); tscFreeQueryInfo(pCmd, removeMeta); + + taosHashCleanup(pCmd->pTableMetaMap); + pCmd->pTableMetaMap = NULL; } void tscFreeSqlResult(SSqlObj* pSql) { @@ -1834,8 +1838,8 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { memset(pFieldInfo, 0, sizeof(SFieldInfo)); } -void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc) { - assert(pFieldInfo != NULL && pSrc != NULL); +void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList) { + assert(pFieldInfo != NULL && pSrc != NULL && pExprList != NULL); pFieldInfo->numOfOutput = pSrc->numOfOutput; if (pSrc->final != NULL) { @@ -1845,13 +1849,25 @@ void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc) { if (pSrc->internalField != NULL) { size_t num = taosArrayGetSize(pSrc->internalField); + size_t numOfExpr = taosArrayGetSize(pExprList); + for (int32_t i = 0; i < num; ++i) { SInternalField* pfield = taosArrayGet(pSrc->internalField, i); SInternalField p = {.visible = pfield->visible, .field = pfield->field}; - p.pExpr = calloc(1, sizeof(SExprInfo)); - tscExprAssign(p.pExpr, pfield->pExpr); + int32_t resColId = pfield->pExpr->base.resColId; + for(int32_t j = 0; j < numOfExpr; ++j) { + SExprInfo* pExpr = taosArrayGetP(pExprList, j); + if (pExpr->base.resColId == resColId) { + p.pExpr = pExpr; + break; + } + } +// p.pExpr = calloc(1, sizeof(SExprInfo)); + +// tscExprAssign(p.pExpr, pfield->pExpr); + taosArrayPush(pFieldInfo->internalField, &p); } } } @@ -2012,7 +2028,7 @@ int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) for (int32_t i = 0; i < size; ++i) { SExprInfo* pExpr = taosArrayGetP(src, i); - if (uid != 0 && pExpr->base.uid == uid) { + if (pExpr->base.uid == uid) { if (deepcopy) { SExprInfo* p1 = calloc(1, sizeof(SExprInfo)); tscExprAssign(p1, pExpr); @@ -2029,6 +2045,26 @@ int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) return 0; } +int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy) { + assert(src != NULL && dst != NULL); + + size_t size = taosArrayGetSize(src); + for (int32_t i = 0; i < size; ++i) { + SExprInfo* pExpr = taosArrayGetP(src, i); + + if (deepcopy) { + SExprInfo* p1 = calloc(1, sizeof(SExprInfo)); + tscExprAssign(p1, pExpr); + + taosArrayPush(dst, &p1); + } else { + taosArrayPush(dst, &pExpr); + } + } + + return 0; +} + bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) { // ignore the tbname columnIndex to be inserted into source list if (columnIndex < 0) { @@ -2688,7 +2724,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { pQueryInfo->fillType = pSrc->fillType; pQueryInfo->fillVal = NULL; pQueryInfo->clauseLimit = pSrc->clauseLimit; - pQueryInfo->numOfTables = pSrc->numOfTables; + pQueryInfo->numOfTables = 0; pQueryInfo->window = pSrc->window; pQueryInfo->sessionWindow = pSrc->sessionWindow; pQueryInfo->pTableMetaInfo = NULL; @@ -2728,14 +2764,14 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { memcpy(pQueryInfo->fillVal, pSrc->fillVal, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t)); } - tscColumnListCopyAll(pQueryInfo->colList, pSrc->colList); - tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSrc->fieldsInfo); - - if (tscExprCopy(pQueryInfo->exprList, pSrc->exprList, 0, true) != 0) { + if (tscExprCopyAll(pQueryInfo->exprList, pSrc->exprList, true) != 0) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } + tscColumnListCopyAll(pQueryInfo->colList, pSrc->colList); + tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSrc->fieldsInfo, pQueryInfo->exprList); + for(int32_t i = 0; i < pSrc->numOfTables; ++i) { STableMetaInfo* p1 = tscGetMetaInfo((SQueryInfo*) pSrc, i); @@ -2822,28 +2858,29 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) { if (removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pTableMetaInfo->name, name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscClearTableMetaInfo(pTableMetaInfo); + free(pTableMetaInfo); } - + tfree(pQueryInfo->pTableMetaInfo); } STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableMeta* pTableMeta, SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables) { - void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); - if (pAlloc == NULL) { + void* tmp = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); + if (tmp == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - pQueryInfo->pTableMetaInfo = pAlloc; + pQueryInfo->pTableMetaInfo = tmp; STableMetaInfo* pTableMetaInfo = calloc(1, sizeof(STableMetaInfo)); + if (pTableMetaInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; @@ -3018,10 +3055,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t SSqlCmd* pnCmd = &pNew->cmd; memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); - + pnCmd->command = cmd; pnCmd->payload = NULL; pnCmd->allocSize = 0; + pnCmd->pTableMetaMap = NULL; pnCmd->pQueryInfo = NULL; pnCmd->clauseIndex = 0; @@ -3206,7 +3244,7 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { } void doRetrieveSubqueryData(SSchedMsg *pMsg) { - SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pMsg->ahandle); + SSqlObj* pSql = (SSqlObj*) pMsg->ahandle; if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pMsg->ahandle); return; @@ -3246,7 +3284,7 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { SSchedMsg schedMsg = {0}; schedMsg.fp = doRetrieveSubqueryData; - schedMsg.ahandle = (void *)pParentSql->self; + schedMsg.ahandle = (void *)pParentSql; schedMsg.thandle = (void *)1; schedMsg.msg = 0; taosScheduleTask(tscQhandle, &schedMsg); @@ -3311,7 +3349,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { tscQueryInfoCopy(pNewQueryInfo, pSub); // create sub query to handle the sub query. - executeQuery(pNew, pSub); + executeQuery(pNew, pNewQueryInfo); } // merge sub query result and generate final results