未验证 提交 bf124afb 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #10990 from taosdata/feature/3.0_liaohj

[td-13039]add api for stream computing.
......@@ -60,6 +60,8 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
void qDestroyQuery(SQuery* pQueryNode);
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
#ifdef __cplusplus
}
#endif
......
......@@ -6,7 +6,7 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
mnode scheduler sdb wal transport cjson sync monitor
mnode scheduler sdb wal transport cjson sync monitor parser
)
if(${BUILD_TEST})
......
......@@ -14,6 +14,7 @@
*/
#include "mndStream.h"
#include "parser.h"
#include "mndAuth.h"
#include "mndDb.h"
#include "mndDnode.h"
......@@ -218,28 +219,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0;
}
static SArray *mndExtractNamesFromAst(const SNode *pAst) {
if (pAst->type != QUERY_NODE_SELECT_STMT) return NULL;
SArray *names = taosArrayInit(0, sizeof(void *));
if (names == NULL) {
return NULL;
}
SSelectStmt *pSelect = (SSelectStmt *)pAst;
SNodeList *pNodes = pSelect->pProjectionList;
SListCell *pCell = pNodes->pHead;
while (pCell != NULL) {
if (pCell->pNode->type != QUERY_NODE_FUNCTION) {
continue;
}
SFunctionNode *pFunction = (SFunctionNode *)pCell->pNode;
char *name = strdup(pFunction->node.aliasName);
taosArrayPush(names, &name);
pCell = pCell->pNext;
}
return names;
}
static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
if (NULL == ast) {
return TSDB_CODE_SUCCESS;
......@@ -276,14 +255,16 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1;
}
#if 1
SArray *names = mndExtractNamesFromAst(pAst);
SSchemaWrapper sw = {0};
qExtractResultSchema(pAst, (int32_t*)&sw.nCols, &sw.pSchema);
printf("|");
for (int i = 0; i < taosArrayGetSize(names); i++) {
printf(" %15s |", (char *)taosArrayGetP(names, i));
for (int i = 0; i < sw.nCols; i++) {
printf(" %15s |", (char *)sw.pSchema[i].name);
}
printf("\n=======================================================\n");
pStream->ColAlias = names;
pStream->ColAlias = NULL;
#endif
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
......
......@@ -2208,7 +2208,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
}
for(int32_t i = 1; i < numOfOutput; ++i) {
(*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i].resDataInfo.interBufSize);
(*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize);
}
setCtxTagColumnInfo(pFuncCtx, numOfOutput);
......
......@@ -528,11 +528,10 @@ void firstFunction(SqlFunctionCtx *pCtx) {
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
// All null data column, return directly.
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true);
return;
}
......
......@@ -1771,24 +1771,27 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) {
return code;
}
static int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) {
if (QUERY_NODE_SELECT_STMT == nodeType(pQuery->pRoot)) {
SSelectStmt* pSelect = (SSelectStmt*)pQuery->pRoot;
pQuery->numOfResCols = LIST_LENGTH(pSelect->pProjectionList);
pQuery->pResSchema = calloc(pQuery->numOfResCols, sizeof(SSchema));
if (NULL == pQuery->pResSchema) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY);
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
if (QUERY_NODE_SELECT_STMT == nodeType(pRoot)) {
SSelectStmt* pSelect = (SSelectStmt*) pRoot;
*numOfCols = LIST_LENGTH(pSelect->pProjectionList);
*pSchema = calloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pNode;
int32_t index = 0;
FOREACH(pNode, pSelect->pProjectionList) {
SExprNode* pExpr = (SExprNode*)pNode;
pQuery->pResSchema[index].type = pExpr->resType.type;
pQuery->pResSchema[index].bytes = pExpr->resType.bytes;
strcpy(pQuery->pResSchema[index].name, pExpr->aliasName);
(*pSchema)[index].type = pExpr->resType.type;
(*pSchema)[index].bytes = pExpr->resType.bytes;
(*pSchema)[index].colId = index + 1;
strcpy((*pSchema)[index].name, pExpr->aliasName);
index +=1;
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -2372,7 +2375,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->haveResultSet = true;
pQuery->directRpc = false;
pQuery->msgType = TDMT_VND_QUERY;
code = setReslutSchema(pCxt, pQuery);
code = qExtractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema);
break;
case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->haveResultSet = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册