未验证 提交 af560c84 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10950 from taosdata/feature/tq

extract output name from ast
......@@ -78,7 +78,7 @@ int32_t create_stream() {
taos_free_result(pRes);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
const char* sql = "select min(k), max(k), sum(k) from st1";
const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
pRes = tmq_create_stream(pConn, "stream1", "out1", sql);
if (taos_errno(pRes) != 0) {
......
......@@ -2374,6 +2374,13 @@ typedef struct {
int32_t reserved;
} SStreamTaskExecRsp;
typedef struct {
SMsgHead head;
int64_t streamId;
int64_t version;
SArray* res; // SArray<SSDataBlock>
} SStreamSmaSinkReq;
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -697,12 +697,12 @@ typedef struct {
char* logicalPlan;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* outputName;
} SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj);
#ifdef __cplusplus
}
#endif
......
......@@ -16,6 +16,7 @@
#include "mndDef.h"
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t outputNameSz = 0;
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
......@@ -43,6 +44,15 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
} else {
tEncodeI32(pEncoder, 0);
}
if (pObj->outputName != NULL) {
outputNameSz = taosArrayGetSize(pObj->outputName);
}
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
for (int32_t i = 0; i < outputNameSz; i++) {
char *name = taosArrayGetP(pObj->outputName, i);
if (tEncodeCStr(pEncoder, name) < 0) return -1;
}
return pEncoder->pos;
}
......@@ -76,5 +86,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
} else {
pObj->tasks = NULL;
}
int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
pObj->outputName = taosArrayInit(outputNameSz, sizeof(void *));
if (pObj->outputName == NULL) {
return -1;
}
for (int32_t i = 0; i < outputNameSz; i++) {
char *name;
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
taosArrayPush(pObj->outputName, &name);
}
return 0;
}
......@@ -218,6 +218,28 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0;
}
static SArray *mndExtractNamesFromAst(const SNode *pAst) {
if (pAst->type != QUERY_NODE_SELECT_STMT) return NULL;
SArray *names = taosArrayInit(0, sizeof(void *));
if (names == NULL) {
return NULL;
}
SSelectStmt *pSelect = (SSelectStmt *)pAst;
SNodeList *pNodes = pSelect->pProjectionList;
SListCell *pCell = pNodes->pHead;
while (pCell != NULL) {
if (pCell->pNode->type != QUERY_NODE_FUNCTION) {
continue;
}
SFunctionNode *pFunction = (SFunctionNode *)pCell->pNode;
char *name = strdup(pFunction->node.aliasName);
taosArrayPush(names, &name);
pCell = pCell->pNext;
}
return names;
}
static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) {
if (NULL == pCreate->ast) {
return TSDB_CODE_SUCCESS;
......@@ -259,6 +281,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
/*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented";
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) < 0) {
return -1;
}
SArray *names = mndExtractNamesFromAst(pAst);
printf("|");
for (int i = 0; i < taosArrayGetSize(names); i++) {
printf(" %15s |", (char *)taosArrayGetP(names, i));
}
printf("\n=======================================================\n");
streamObj.outputName = names;
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) {
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册