提交 031a13c9 编写于 作者: X Xiaoyu Wang

fix: create stream check and error code wrong

上级 4a13f0ed
...@@ -427,9 +427,11 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -427,9 +427,11 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
} }
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
...@@ -443,7 +445,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -443,7 +445,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
} }
pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys); pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys);
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -482,7 +484,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -482,7 +484,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = code;
if (pInfo != NULL) { if (pInfo != NULL) {
destroyGroupOperatorInfo(pInfo); destroyGroupOperatorInfo(pInfo);
} }
...@@ -991,8 +993,8 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* ...@@ -991,8 +993,8 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
if (pTableSup->numOfExprs > 0) { if (pTableSup->numOfExprs > 0) {
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1); void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1);
char* tbName = pSrcBlock->info.parTbName; char* tbName = pSrcBlock->info.parTbName;
memset(tbName, 0, TSDB_TABLE_NAME_LEN); memset(tbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, varDataVal(pData), len); memcpy(tbName, varDataVal(pData), len);
...@@ -1011,7 +1013,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* ...@@ -1011,7 +1013,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false); colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
pDestBlock->info.rows++; pDestBlock->info.rows++;
blockDataDestroy(pTmpBlock); blockDataDestroy(pTmpBlock);
} }
...@@ -1020,7 +1022,8 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* ...@@ -1020,7 +1022,8 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {
SStreamPartitionOperatorInfo* pInfo = pOperator->info; SStreamPartitionOperatorInfo* pInfo = pOperator->info;
if ( (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) || taosHashGetSize(pInfo->pPartitions) == 0) { if ((pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) ||
taosHashGetSize(pInfo->pPartitions) == 0) {
return NULL; return NULL;
} }
blockDataCleanup(pInfo->pCreateTbRes); blockDataCleanup(pInfo->pCreateTbRes);
...@@ -1029,9 +1032,9 @@ static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { ...@@ -1029,9 +1032,9 @@ static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {
while (pInfo->pTbNameIte != NULL) { while (pInfo->pTbNameIte != NULL) {
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte; SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
int32_t rowId = *(int32_t*) taosArrayGet(pParInfo->rowIds, 0); int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0);
appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes); pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes);
pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte); pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte);
} }
return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL; return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL;
...@@ -1200,12 +1203,14 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { ...@@ -1200,12 +1203,14 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo)); SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
int32_t code = TSDB_CODE_SUCCESS;
pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->part.pPartitionKeys); pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->part.pPartitionKeys);
if (pPartNode->part.pExprs != NULL) { if (pPartNode->part.pExprs != NULL) {
...@@ -1237,11 +1242,11 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1237,11 +1242,11 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
int32_t numOfTags; int32_t numOfTags;
SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags); SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags);
if (pTagExpr == NULL) { if (pTagExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) { if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
} }
...@@ -1262,6 +1267,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1262,6 +1267,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc); pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc);
if (pInfo->binfo.pRes == NULL) { if (pInfo->binfo.pRes == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
...@@ -1291,7 +1297,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1291,7 +1297,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = code;
destroyStreamPartitionOperatorInfo(pInfo); destroyStreamPartitionOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
return NULL; return NULL;
......
...@@ -5021,6 +5021,18 @@ static const SSchema* getColSchema(const STableMeta* pTableMeta, const char* pCo ...@@ -5021,6 +5021,18 @@ static const SSchema* getColSchema(const STableMeta* pTableMeta, const char* pCo
return NULL; return NULL;
} }
static const SSchema* getNormalColSchema(const STableMeta* pTableMeta, const char* pColName) {
int32_t numOfCols = getNumOfColumns(pTableMeta);
SSchema* pColsSchema = getTableColumnSchema(pTableMeta);
for (int32_t i = 0; i < numOfCols; ++i) {
const SSchema* pSchema = pColsSchema + i;
if (0 == strcmp(pColName, pSchema->name)) {
return pSchema;
}
}
return NULL;
}
static SSchema* getTagSchema(const STableMeta* pTableMeta, const char* pTagName) { static SSchema* getTagSchema(const STableMeta* pTableMeta, const char* pTagName) {
int32_t numOfTags = getNumOfTags(pTableMeta); int32_t numOfTags = getNumOfTags(pTableMeta);
SSchema* pTagsSchema = getTableTagSchema(pTableMeta); SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
...@@ -5906,7 +5918,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCol ...@@ -5906,7 +5918,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCol
SNode* pCol = NULL; SNode* pCol = NULL;
SNode* pProj = NULL; SNode* pProj = NULL;
FORBOTH(pCol, pCols, pProj, *pProjections) { FORBOTH(pCol, pCols, pProj, *pProjections) {
const SSchema* pSchema = getColSchema(pMeta, ((SColumnNode*)pCol)->colName); const SSchema* pSchema = getNormalColSchema(pMeta, ((SColumnNode*)pCol)->colName);
if (NULL == pSchema) { if (NULL == pSchema) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)pCol)->colName); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)pCol)->colName);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册