From b7368e2133494dc73834411dd4d06192c5c2f3aa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Mar 2022 10:59:15 +0800 Subject: [PATCH] [td-13039]add api for stream computing. --- source/dnode/mnode/impl/CMakeLists.txt | 2 +- source/dnode/mnode/impl/src/mndStream.c | 33 ++++++------------------- source/libs/parser/src/parTranslater.c | 25 ++++++++++--------- 3 files changed, 22 insertions(+), 38 deletions(-) diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 514bba19f4..dd2caf1f7f 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -6,7 +6,7 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( - mnode scheduler sdb wal transport cjson sync monitor + mnode scheduler sdb wal transport cjson sync monitor parser ) if(${BUILD_TEST}) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bb7891ad2d..ca34938faf 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -14,6 +14,7 @@ */ #include "mndStream.h" +#include "parser.h" #include "mndAuth.h" #include "mndDb.h" #include "mndDnode.h" @@ -218,28 +219,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static SArray *mndExtractNamesFromAst(const SNode *pAst) { - if (pAst->type != QUERY_NODE_SELECT_STMT) return NULL; - - SArray *names = taosArrayInit(0, sizeof(void *)); - if (names == NULL) { - return NULL; - } - SSelectStmt *pSelect = (SSelectStmt *)pAst; - SNodeList *pNodes = pSelect->pProjectionList; - SListCell *pCell = pNodes->pHead; - while (pCell != NULL) { - if (pCell->pNode->type != QUERY_NODE_FUNCTION) { - continue; - } - SFunctionNode *pFunction = (SFunctionNode *)pCell->pNode; - char *name = strdup(pFunction->node.aliasName); - taosArrayPush(names, &name); - pCell = pCell->pNext; - } - return names; -} - static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { if (NULL == ast) { return TSDB_CODE_SUCCESS; @@ -273,14 +252,16 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } #if 1 - SArray *names = mndExtractNamesFromAst(pAst); + SSchemaWrapper sw = {0}; + qExtractResultSchema(pAst, (int32_t*)&sw.nCols, &sw.pSchema); + printf("|"); - for (int i = 0; i < taosArrayGetSize(names); i++) { - printf(" %15s |", (char *)taosArrayGetP(names, i)); + for (int i = 0; i < sw.nCols; i++) { + printf(" %15s |", (char *)sw.pSchema[i].name); } printf("\n=======================================================\n"); - pStream->ColAlias = names; + pStream->ColAlias = NULL; #endif if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f82ce2c1b4..afd29d7f74 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1696,24 +1696,27 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) { return code; } -static int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) { - if (QUERY_NODE_SELECT_STMT == nodeType(pQuery->pRoot)) { - SSelectStmt* pSelect = (SSelectStmt*)pQuery->pRoot; - pQuery->numOfResCols = LIST_LENGTH(pSelect->pProjectionList); - pQuery->pResSchema = calloc(pQuery->numOfResCols, sizeof(SSchema)); - if (NULL == pQuery->pResSchema) { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY); +int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) { + if (QUERY_NODE_SELECT_STMT == nodeType(pRoot)) { + SSelectStmt* pSelect = (SSelectStmt*) pRoot; + *numOfCols = LIST_LENGTH(pSelect->pProjectionList); + *pSchema = calloc((*numOfCols), sizeof(SSchema)); + if (NULL == (*pSchema)) { + return TSDB_CODE_OUT_OF_MEMORY; } + SNode* pNode; int32_t index = 0; FOREACH(pNode, pSelect->pProjectionList) { SExprNode* pExpr = (SExprNode*)pNode; - pQuery->pResSchema[index].type = pExpr->resType.type; - pQuery->pResSchema[index].bytes = pExpr->resType.bytes; - strcpy(pQuery->pResSchema[index].name, pExpr->aliasName); + (*pSchema)[index].type = pExpr->resType.type; + (*pSchema)[index].bytes = pExpr->resType.bytes; + (*pSchema)[index].colId = index + 1; + strcpy((*pSchema)[index].name, pExpr->aliasName); index +=1; } } + return TSDB_CODE_SUCCESS; } @@ -2297,7 +2300,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { pQuery->haveResultSet = true; pQuery->directRpc = false; pQuery->msgType = TDMT_VND_QUERY; - code = setReslutSchema(pCxt, pQuery); + code = qExtractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema); break; case QUERY_NODE_VNODE_MODIF_STMT: pQuery->haveResultSet = false; -- GitLab