提交 b7368e21 编写于 作者: H Haojun Liao

[td-13039]add api for stream computing.

上级 fb94d2da
...@@ -6,7 +6,7 @@ target_include_directories( ...@@ -6,7 +6,7 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
target_link_libraries( target_link_libraries(
mnode scheduler sdb wal transport cjson sync monitor mnode scheduler sdb wal transport cjson sync monitor parser
) )
if(${BUILD_TEST}) if(${BUILD_TEST})
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "mndStream.h" #include "mndStream.h"
#include "parser.h"
#include "mndAuth.h" #include "mndAuth.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
...@@ -218,28 +219,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { ...@@ -218,28 +219,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0; return 0;
} }
static SArray *mndExtractNamesFromAst(const SNode *pAst) {
if (pAst->type != QUERY_NODE_SELECT_STMT) return NULL;
SArray *names = taosArrayInit(0, sizeof(void *));
if (names == NULL) {
return NULL;
}
SSelectStmt *pSelect = (SSelectStmt *)pAst;
SNodeList *pNodes = pSelect->pProjectionList;
SListCell *pCell = pNodes->pHead;
while (pCell != NULL) {
if (pCell->pNode->type != QUERY_NODE_FUNCTION) {
continue;
}
SFunctionNode *pFunction = (SFunctionNode *)pCell->pNode;
char *name = strdup(pFunction->node.aliasName);
taosArrayPush(names, &name);
pCell = pCell->pNext;
}
return names;
}
static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
if (NULL == ast) { if (NULL == ast) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -273,14 +252,16 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast ...@@ -273,14 +252,16 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1; return -1;
} }
#if 1 #if 1
SArray *names = mndExtractNamesFromAst(pAst); SSchemaWrapper sw = {0};
qExtractResultSchema(pAst, (int32_t*)&sw.nCols, &sw.pSchema);
printf("|"); printf("|");
for (int i = 0; i < taosArrayGetSize(names); i++) { for (int i = 0; i < sw.nCols; i++) {
printf(" %15s |", (char *)taosArrayGetP(names, i)); printf(" %15s |", (char *)sw.pSchema[i].name);
} }
printf("\n=======================================================\n"); printf("\n=======================================================\n");
pStream->ColAlias = names; pStream->ColAlias = NULL;
#endif #endif
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
......
...@@ -1696,24 +1696,27 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) { ...@@ -1696,24 +1696,27 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) {
return code; return code;
} }
static int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) { int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
if (QUERY_NODE_SELECT_STMT == nodeType(pQuery->pRoot)) { if (QUERY_NODE_SELECT_STMT == nodeType(pRoot)) {
SSelectStmt* pSelect = (SSelectStmt*)pQuery->pRoot; SSelectStmt* pSelect = (SSelectStmt*) pRoot;
pQuery->numOfResCols = LIST_LENGTH(pSelect->pProjectionList); *numOfCols = LIST_LENGTH(pSelect->pProjectionList);
pQuery->pResSchema = calloc(pQuery->numOfResCols, sizeof(SSchema)); *pSchema = calloc((*numOfCols), sizeof(SSchema));
if (NULL == pQuery->pResSchema) { if (NULL == (*pSchema)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY); return TSDB_CODE_OUT_OF_MEMORY;
} }
SNode* pNode; SNode* pNode;
int32_t index = 0; int32_t index = 0;
FOREACH(pNode, pSelect->pProjectionList) { FOREACH(pNode, pSelect->pProjectionList) {
SExprNode* pExpr = (SExprNode*)pNode; SExprNode* pExpr = (SExprNode*)pNode;
pQuery->pResSchema[index].type = pExpr->resType.type; (*pSchema)[index].type = pExpr->resType.type;
pQuery->pResSchema[index].bytes = pExpr->resType.bytes; (*pSchema)[index].bytes = pExpr->resType.bytes;
strcpy(pQuery->pResSchema[index].name, pExpr->aliasName); (*pSchema)[index].colId = index + 1;
strcpy((*pSchema)[index].name, pExpr->aliasName);
index +=1; index +=1;
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2297,7 +2300,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -2297,7 +2300,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->haveResultSet = true; pQuery->haveResultSet = true;
pQuery->directRpc = false; pQuery->directRpc = false;
pQuery->msgType = TDMT_VND_QUERY; pQuery->msgType = TDMT_VND_QUERY;
code = setReslutSchema(pCxt, pQuery); code = qExtractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema);
break; break;
case QUERY_NODE_VNODE_MODIF_STMT: case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->haveResultSet = false; pQuery->haveResultSet = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册