提交 492cb923 编写于 作者: L Liu Jicong

extract output name from ast

上级 58122d07
...@@ -78,7 +78,7 @@ int32_t create_stream() { ...@@ -78,7 +78,7 @@ int32_t create_stream() {
taos_free_result(pRes); taos_free_result(pRes);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
const char* sql = "select min(k), max(k), sum(k) from st1"; const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
pRes = tmq_create_stream(pConn, "stream1", "out1", sql); pRes = tmq_create_stream(pConn, "stream1", "out1", sql);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
......
...@@ -2374,6 +2374,13 @@ typedef struct { ...@@ -2374,6 +2374,13 @@ typedef struct {
int32_t reserved; int32_t reserved;
} SStreamTaskExecRsp; } SStreamTaskExecRsp;
typedef struct {
SMsgHead head;
int64_t streamId;
int64_t version;
SArray* res; // SArray<SSDataBlock>
} SStreamSmaSinkReq;
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -697,12 +697,12 @@ typedef struct { ...@@ -697,12 +697,12 @@ typedef struct {
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>> SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* outputName;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj); int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "mndDef.h" #include "mndDef.h"
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t outputNameSz = 0;
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
...@@ -43,6 +44,15 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -43,6 +44,15 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
} else { } else {
tEncodeI32(pEncoder, 0); tEncodeI32(pEncoder, 0);
} }
if (pObj->outputName != NULL) {
outputNameSz = taosArrayGetSize(pObj->outputName);
}
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
for (int32_t i = 0; i < outputNameSz; i++) {
char *name = taosArrayGetP(pObj->outputName, i);
if (tEncodeCStr(pEncoder, name) < 0) return -1;
}
return pEncoder->pos; return pEncoder->pos;
} }
...@@ -76,5 +86,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -76,5 +86,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
} else { } else {
pObj->tasks = NULL; pObj->tasks = NULL;
} }
int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
pObj->outputName = taosArrayInit(outputNameSz, sizeof(void *));
if (pObj->outputName == NULL) {
return -1;
}
for (int32_t i = 0; i < outputNameSz; i++) {
char *name;
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
taosArrayPush(pObj->outputName, &name);
}
return 0; return 0;
} }
...@@ -218,6 +218,28 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { ...@@ -218,6 +218,28 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0; return 0;
} }
static SArray *mndExtractNamesFromAst(const SNode *pAst) {
if (pAst->type != QUERY_NODE_SELECT_STMT) return NULL;
SArray *names = taosArrayInit(0, sizeof(void *));
if (names == NULL) {
return NULL;
}
SSelectStmt *pSelect = (SSelectStmt *)pAst;
SNodeList *pNodes = pSelect->pProjectionList;
SListCell *pCell = pNodes->pHead;
while (pCell != NULL) {
if (pCell->pNode->type != QUERY_NODE_FUNCTION) {
continue;
}
SFunctionNode *pFunction = (SFunctionNode *)pCell->pNode;
char *name = strdup(pFunction->node.aliasName);
taosArrayPush(names, &name);
pCell = pCell->pNext;
}
return names;
}
static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) { static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) {
if (NULL == pCreate->ast) { if (NULL == pCreate->ast) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -259,6 +281,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -259,6 +281,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
/*streamObj.physicalPlan = "";*/ /*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented"; streamObj.logicalPlan = "not implemented";
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) < 0) {
return -1;
}
SArray *names = mndExtractNamesFromAst(pAst);
printf("|");
for (int i = 0; i < taosArrayGetSize(names); i++) {
printf(" %15s |", (char *)taosArrayGetP(names, i));
}
printf("\n=======================================================\n");
streamObj.outputName = names;
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) {
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册