diff --git a/example/src/tstream.c b/example/src/tstream.c index 6976d9e398726b5fa44917476cd11e601c01f1d0..51578bd27b54eba0a50c3bf5bc18066138ad5b48 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -78,7 +78,7 @@ int32_t create_stream() { taos_free_result(pRes); /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ - const char* sql = "select min(k), max(k), sum(k) from st1"; + const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1"; /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ pRes = tmq_create_stream(pConn, "stream1", "out1", sql); if (taos_errno(pRes) != 0) { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 812025e8e46a4ed747b270d12ca3020558270e03..6a7edb481f34f0a830e75eaf56d49c513bbe313f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2374,6 +2374,13 @@ typedef struct { int32_t reserved; } SStreamTaskExecRsp; +typedef struct { + SMsgHead head; + int64_t streamId; + int64_t version; + SArray* res; // SArray +} SStreamSmaSinkReq; + #pragma pack(pop) #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 909486aaac211b405a3fc468fc151f9b46573648..30b4c923d9bbc7057cc54ae1ee7aea66f8cf96fd 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -697,12 +697,12 @@ typedef struct { char* logicalPlan; char* physicalPlan; SArray* tasks; // SArray> + SArray* outputName; } SStreamObj; int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index f81dead3259c3e43d0287829dfcd33be6ba2fad1..f0905f88d2ce0382bb8a7e2a35d13a05fab27216 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -16,6 +16,7 @@ #include "mndDef.h" int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { + int32_t outputNameSz = 0; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; @@ -43,6 +44,15 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { } else { tEncodeI32(pEncoder, 0); } + + if (pObj->outputName != NULL) { + outputNameSz = taosArrayGetSize(pObj->outputName); + } + if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1; + for (int32_t i = 0; i < outputNameSz; i++) { + char *name = taosArrayGetP(pObj->outputName, i); + if (tEncodeCStr(pEncoder, name) < 0) return -1; + } return pEncoder->pos; } @@ -76,5 +86,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { } else { pObj->tasks = NULL; } + int32_t outputNameSz; + if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1; + pObj->outputName = taosArrayInit(outputNameSz, sizeof(void *)); + if (pObj->outputName == NULL) { + return -1; + } + for (int32_t i = 0; i < outputNameSz; i++) { + char *name; + if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1; + taosArrayPush(pObj->outputName, &name); + } return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 2ca4e10f6845f265391d6d0f180feb65ce4a36be..d9b5341ba15e505caea25f353cd634a9e4d37c48 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -218,6 +218,28 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } +static SArray *mndExtractNamesFromAst(const SNode *pAst) { + if (pAst->type != QUERY_NODE_SELECT_STMT) return NULL; + + SArray *names = taosArrayInit(0, sizeof(void *)); + if (names == NULL) { + return NULL; + } + SSelectStmt *pSelect = (SSelectStmt *)pAst; + SNodeList *pNodes = pSelect->pProjectionList; + SListCell *pCell = pNodes->pHead; + while (pCell != NULL) { + if (pCell->pNode->type != QUERY_NODE_FUNCTION) { + continue; + } + SFunctionNode *pFunction = (SFunctionNode *)pCell->pNode; + char *name = strdup(pFunction->node.aliasName); + taosArrayPush(names, &name); + pCell = pCell->pNext; + } + return names; +} + static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) { if (NULL == pCreate->ast) { return TSDB_CODE_SUCCESS; @@ -259,6 +281,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; + SNode *pAst = NULL; + if (nodesStringToNode(pCreate->ast, &pAst) < 0) { + return -1; + } + SArray *names = mndExtractNamesFromAst(pAst); + printf("|"); + for (int i = 0; i < taosArrayGetSize(names); i++) { + printf(" %15s |", (char *)taosArrayGetP(names, i)); + } + printf("\n=======================================================\n"); + + streamObj.outputName = names; + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); return -1;