From 031a13c94584d68a69f75ed63e9a65e9459f864c Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 14 Feb 2023 11:24:44 +0800 Subject: [PATCH] fix: create stream check and error code wrong --- source/libs/executor/src/groupoperator.c | 30 ++++++++++++++---------- source/libs/parser/src/parTranslater.c | 14 ++++++++++- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 45f38a43e6..370e1f62de 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -427,9 +427,11 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_SUCCESS; SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } @@ -443,7 +445,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* } pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys); - int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -482,7 +484,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code; if (pInfo != NULL) { destroyGroupOperatorInfo(pInfo); } @@ -991,8 +993,8 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* if (pTableSup->numOfExprs > 0) { projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); - void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1); - char* tbName = pSrcBlock->info.parTbName; + void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1); + char* tbName = pSrcBlock->info.parTbName; memset(tbName, 0, TSDB_TABLE_NAME_LEN); int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); memcpy(tbName, varDataVal(pData), len); @@ -1011,7 +1013,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false); - + pDestBlock->info.rows++; blockDataDestroy(pTmpBlock); } @@ -1020,7 +1022,8 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { SStreamPartitionOperatorInfo* pInfo = pOperator->info; - if ( (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) || taosHashGetSize(pInfo->pPartitions) == 0) { + if ((pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) || + taosHashGetSize(pInfo->pPartitions) == 0) { return NULL; } blockDataCleanup(pInfo->pCreateTbRes); @@ -1029,9 +1032,9 @@ static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { while (pInfo->pTbNameIte != NULL) { SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte; - int32_t rowId = *(int32_t*) taosArrayGet(pParInfo->rowIds, 0); + int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0); appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, - pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes); + pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes); pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte); } return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL; @@ -1200,12 +1203,14 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_SUCCESS; SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } - int32_t code = TSDB_CODE_SUCCESS; + pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->part.pPartitionKeys); if (pPartNode->part.pExprs != NULL) { @@ -1237,11 +1242,11 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr int32_t numOfTags; SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags); if (pTagExpr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } } @@ -1262,6 +1267,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc); if (pInfo->binfo.pRes == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } @@ -1291,7 +1297,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code; destroyStreamPartitionOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); return NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1347193d99..af247b6e65 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5021,6 +5021,18 @@ static const SSchema* getColSchema(const STableMeta* pTableMeta, const char* pCo return NULL; } +static const SSchema* getNormalColSchema(const STableMeta* pTableMeta, const char* pColName) { + int32_t numOfCols = getNumOfColumns(pTableMeta); + SSchema* pColsSchema = getTableColumnSchema(pTableMeta); + for (int32_t i = 0; i < numOfCols; ++i) { + const SSchema* pSchema = pColsSchema + i; + if (0 == strcmp(pColName, pSchema->name)) { + return pSchema; + } + } + return NULL; +} + static SSchema* getTagSchema(const STableMeta* pTableMeta, const char* pTagName) { int32_t numOfTags = getNumOfTags(pTableMeta); SSchema* pTagsSchema = getTableTagSchema(pTableMeta); @@ -5906,7 +5918,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCol SNode* pCol = NULL; SNode* pProj = NULL; FORBOTH(pCol, pCols, pProj, *pProjections) { - const SSchema* pSchema = getColSchema(pMeta, ((SColumnNode*)pCol)->colName); + const SSchema* pSchema = getNormalColSchema(pMeta, ((SColumnNode*)pCol)->colName); if (NULL == pSchema) { code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)pCol)->colName); } -- GitLab