diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b0b0adbe782795c18ef3ec09272dc6fa23e7415a..48a2893dfb8c3ce9e0c5a0b5a83360d46cfa6b5c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2623,6 +2623,40 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp * void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp) { taosArrayDestroy(pRsp->taskStatus); } +int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->tableName.type) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->tableName.acctId) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->tableName.dbname) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->tableName.tname) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->tableName.type) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->tableName.acctId) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->tableName.dbname) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->tableName.tname) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + + int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { int32_t tlen = 0; @@ -2655,4 +2689,3 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { return buf; } - \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 326f7ca3e935164930c2cab6af9354d9cd08e17f..800e530e2f568e37a7e906d899787af6d09438cf 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -8098,8 +8098,7 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); -int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) { - int32_t code = 0; +SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) { if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0); } @@ -8111,18 +8110,18 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, char tableFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pScanPhyNode->tableName, tableFName); - code = vnodeValidateTableHash(pHandle->config, tableFName); + int32_t code = vnodeValidateTableHash(pHandle->config, tableFName); if (code) { errInfo->code = code; errInfo->tableName = pScanPhyNode->tableName; - return code; + return NULL; } size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); - pTaskInfo->pRoot = createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, + return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; @@ -8132,11 +8131,7 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableGroupInfo groupInfo = {0}; - code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); - if (code) { - return code; - } - + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); SArray* idList = NULL; if (groupInfo.numOfTables > 0) { @@ -8155,11 +8150,9 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, idList = taosArrayInit(4, sizeof(uint64_t)); } - // SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pScanPhyNode->pScanCols, idList, pTaskInfo); - // taosArrayDestroy(idList); - - // //TODO destroy groupInfo - // return pOperator; +// SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pScanPhyNode->pScanCols, idList, pTaskInfo); + taosArrayDestroy(idList); +// return pOperator; } } @@ -8169,14 +8162,14 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - code = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo); - if (code) { - return code; + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo); + if (errInfo->code) { + return NULL; } SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - pTaskInfo->pRoot = createAggregateOperatorInfo(pTaskInfo->pRoot, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); + return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); } } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { size_t size = taosArrayGetSize(pPhyNode->pChildren); @@ -8188,12 +8181,6 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); } }*/ - - if (pTaskInfo->pRoot == NULL) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - } - - return code; } static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readHandle, uint64_t queryId, uint64_t taskId) { @@ -8267,8 +8254,14 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } STableGroupInfo group = {0}; - code = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo); - if (code) { + (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo); + if (errInfo->code) { + code = errInfo->code; + goto _complete; + } + + if ((*pTaskInfo)->pRoot == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _complete; }