提交 c2aebb25 编写于 作者: X Xiaoyu Wang

TD-13675 create sma index parser implement

上级 eae2d7b6
......@@ -51,6 +51,7 @@ typedef struct SQuery {
SSchema* pResSchema;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
bool streamQuery;
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
......
......@@ -26,6 +26,7 @@ typedef struct SPlanContext {
uint64_t queryId;
int32_t acctId;
SNode* pAstRoot;
bool streamQuery;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "parser.h"
#include "planner.h"
#include "query.h"
#include "taos.h"
#include "tcommon.h"
......@@ -229,6 +230,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
// --- heartbeat
// global, called by mgmt
......
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#include "tdatablock.h"
#include "tdef.h"
......@@ -211,7 +209,6 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
}
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
......
......@@ -482,21 +482,19 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
}
tscDebug("start to create topic, %s", topicName);
#if 0
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo*)pQueryNode;
pQueryStmtInfo->info.continueQuery = true;
pQueryNode->streamQuery = true;
// todo check for invalid sql statement and return with error code
SSchema* schema = NULL;
int32_t numOfCols = 0;
CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, &schema, &numOfCols, NULL, pRequest->requestId),
_return);
CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, NULL), _return);
pStr = qDagToString(pRequest->body.pDag);
pStr = qQueryPlanToString(pRequest->body.pDag);
if (pStr == NULL) {
goto _return;
}
......@@ -506,7 +504,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
// The topic should be related to a database that the queried table is belonged to.
SName name = {0};
char dbName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&((SQueryStmtInfo*)pQueryNode)->pTableMetaInfo[0]->name, dbName);
// tNameGetFullDbName(&((SQueryStmtInfo*)pQueryNode)->pTableMetaInfo[0]->name, dbName);
tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB);
tNameFromString(&name, topicName, T_NAME_TABLE);
......@@ -538,7 +536,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
#endif
_return:
qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/
......
......@@ -216,6 +216,14 @@ static EDealRes destroyNode(SNode** pNode, void* pContext) {
case QUERY_NODE_NODE_LIST:
nodesClearList(((SNodeListNode*)(*pNode))->pNodeList);
break;
case QUERY_NODE_INDEX_OPTIONS: {
SIndexOptions* pStmt = (SIndexOptions*)*pNode;
nodesDestroyList(pStmt->pFuncs);
nodesDestroyNode(pStmt->pInterval);
nodesDestroyNode(pStmt->pOffset);
nodesDestroyNode(pStmt->pSliding);
break;
}
case QUERY_NODE_SELECT_STMT: {
SSelectStmt* pStmt = (SSelectStmt*)*pNode;
nodesDestroyList(pStmt->pProjectionList);
......@@ -256,6 +264,12 @@ static EDealRes destroyNode(SNode** pNode, void* pContext) {
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
nodesDestroyList(((SCreateMultiTableStmt*)(*pNode))->pSubTables);
break;
case QUERY_NODE_CREATE_INDEX_STMT: {
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)*pNode;
nodesDestroyNode(pStmt->pOptions);
nodesDestroyList(pStmt->pCols);
break;
}
default:
break;
}
......
......@@ -197,9 +197,9 @@ cmd ::= CREATE FULLTEXT INDEX
index_options(A) ::= . { A = NULL; }
index_options(A) ::= FUNCTION NK_LP func_list(B) NK_RP INTERVAL
NK_LP duration_literal(C) NK_RP sliding_opt(D). { A = createIndexOption(pCxt, B, C, NULL, D); }
NK_LP duration_literal(C) NK_RP sliding_opt(D). { A = createIndexOption(pCxt, B, releaseRawExprNode(pCxt, C), NULL, D); }
index_options(A) ::= FUNCTION NK_LP func_list(B) NK_RP INTERVAL
NK_LP duration_literal(C) NK_COMMA duration_literal(D) NK_RP sliding_opt(E). { A = createIndexOption(pCxt, B, C, D, E); }
NK_LP duration_literal(C) NK_COMMA duration_literal(D) NK_RP sliding_opt(E). { A = createIndexOption(pCxt, B, releaseRawExprNode(pCxt, C), releaseRawExprNode(pCxt, D), E); }
%type func_list { SNodeList* }
%destructor func_list { nodesDestroyList($$); }
......
......@@ -795,14 +795,14 @@ static int32_t translateCreateDatabase(STranslateContext* pCxt, SCreateDatabaseS
buildCreateDbReq(pCxt, pStmt, &createReq);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_CREATE_DB;
pCxt->pCmdMsg->msgLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSCreateDbReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &createReq);
......@@ -818,14 +818,14 @@ static int32_t translateDropDatabase(STranslateContext* pCxt, SDropDatabaseStmt*
dropReq.ignoreNotExists = pStmt->ignoreNotExists;
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_DROP_DB;
pCxt->pCmdMsg->msgLen = tSerializeSDropDbReq(NULL, 0, &dropReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSDropDbReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &dropReq);
......@@ -859,7 +859,7 @@ static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableSt
tNameExtractFullName(&tableName, createReq.name);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
tFreeSMCreateStbReq(&createReq);
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -867,7 +867,7 @@ static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableSt
pCxt->pCmdMsg->msgType = TDMT_MND_CREATE_STB;
pCxt->pCmdMsg->msgLen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
tFreeSMCreateStbReq(&createReq);
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -883,14 +883,14 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p
dropReq.igNotExists = ignoreNotExists;
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_DROP_STB;
pCxt->pCmdMsg->msgLen = tSerializeSMDropStbReq(NULL, 0, &dropReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSMDropStbReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &dropReq);
......@@ -936,14 +936,14 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p
catalogGetDBVgVersion(pCxt->pParseCxt->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_USE_DB;
pCxt->pCmdMsg->msgLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSUseDbReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &usedbReq);
......@@ -959,14 +959,14 @@ static int32_t translateCreateUser(STranslateContext* pCxt, SCreateUserStmt* pSt
strcpy(createReq.pass, pStmt->password);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_CREATE_USER;
pCxt->pCmdMsg->msgLen = tSerializeSCreateUserReq(NULL, 0, &createReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSCreateUserReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &createReq);
......@@ -985,14 +985,14 @@ static int32_t translateAlterUser(STranslateContext* pCxt, SAlterUserStmt* pStmt
}
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_ALTER_USER;
pCxt->pCmdMsg->msgLen = tSerializeSAlterUserReq(NULL, 0, &alterReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSAlterUserReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &alterReq);
......@@ -1005,14 +1005,14 @@ static int32_t translateDropUser(STranslateContext* pCxt, SDropUserStmt* pStmt)
strcpy(dropReq.user, pStmt->useName);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_DROP_USER;
pCxt->pCmdMsg->msgLen = tSerializeSDropUserReq(NULL, 0, &dropReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSDropUserReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &dropReq);
......@@ -1026,14 +1026,14 @@ static int32_t translateCreateDnode(STranslateContext* pCxt, SCreateDnodeStmt* p
createReq.port = pStmt->port;
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_CREATE_DNODE;
pCxt->pCmdMsg->msgLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSCreateDnodeReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &createReq);
......@@ -1048,14 +1048,14 @@ static int32_t translateDropDnode(STranslateContext* pCxt, SDropDnodeStmt* pStmt
dropReq.port = pStmt->port;
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_DROP_DNODE;
pCxt->pCmdMsg->msgLen = tSerializeSDropDnodeReq(NULL, 0, &dropReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSDropDnodeReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &dropReq);
......@@ -1093,14 +1093,14 @@ static int32_t translateShow(STranslateContext* pCxt, SShowStmt* pStmt) {
}
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_MND_SHOW;
pCxt->pCmdMsg->msgLen = tSerializeSShowReq(NULL, 0, &showReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSShowReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &showReq);
......@@ -1128,7 +1128,7 @@ static int32_t translateShowTables(STranslateContext* pCxt) {
pShowReq->head.vgId = htonl(info->vgId);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = info->epSet;
......@@ -1167,23 +1167,25 @@ static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt
createSmaReq.tSma.tableUid = pMeta->uid;
createSmaReq.tSma.interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
createSmaReq.tSma.sliding = (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : 0);
code = nodesListToString(pStmt->pCols, false, &createSmaReq.tSma.expr, &createSmaReq.tSma.exprLen);
code = nodesListToString(pStmt->pOptions->pFuncs, false, &createSmaReq.tSma.expr, &createSmaReq.tSma.exprLen);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_VND_CREATE_SMA;
pCxt->pCmdMsg->msgLen = tSerializeSVCreateTSmaReq(NULL, &createSmaReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL== pCxt->pCmdMsg->pMsg) {
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tSerializeSVCreateTSmaReq(pCxt->pCmdMsg->pMsg, &createSmaReq);
void* pBuf = pCxt->pCmdMsg->pMsg;
tSerializeSVCreateTSmaReq(&pBuf, &createSmaReq);
// todo clear SVCreateTSmaReq
return TSDB_CODE_SUCCESS;
}
......
......@@ -2292,10 +2292,10 @@ static YYACTIONTYPE yy_reduce(
{ yymsp[1].minor.yy344 = NULL; }
break;
case 97: /* index_options ::= FUNCTION NK_LP func_list NK_RP INTERVAL NK_LP duration_literal NK_RP sliding_opt */
{ yymsp[-8].minor.yy344 = createIndexOption(pCxt, yymsp[-6].minor.yy280, yymsp[-2].minor.yy344, NULL, yymsp[0].minor.yy344); }
{ yymsp[-8].minor.yy344 = createIndexOption(pCxt, yymsp[-6].minor.yy280, releaseRawExprNode(pCxt, yymsp[-2].minor.yy344), NULL, yymsp[0].minor.yy344); }
break;
case 98: /* index_options ::= FUNCTION NK_LP func_list NK_RP INTERVAL NK_LP duration_literal NK_COMMA duration_literal NK_RP sliding_opt */
{ yymsp[-10].minor.yy344 = createIndexOption(pCxt, yymsp[-8].minor.yy280, yymsp[-4].minor.yy344, yymsp[-2].minor.yy344, yymsp[0].minor.yy344); }
{ yymsp[-10].minor.yy344 = createIndexOption(pCxt, yymsp[-8].minor.yy280, releaseRawExprNode(pCxt, yymsp[-4].minor.yy344), releaseRawExprNode(pCxt, yymsp[-2].minor.yy344), yymsp[0].minor.yy344); }
break;
case 101: /* func ::= function_name NK_LP expression_list NK_RP */
{ yylhsminor.yy344 = createFunctionNode(pCxt, &yymsp[-3].minor.yy209, yymsp[-1].minor.yy280); }
......
......@@ -402,6 +402,6 @@ TEST_F(ParserTest, createTable) {
TEST_F(ParserTest, createSmaIndex) {
setDatabase("root", "test");
bind("create sma index index1 on t1 function(max(c1), min(c3 + 10), sum(c4))");
bind("create sma index index1 on t1 function(max(c1), min(c3 + 10), sum(c4)) INTERVAL(10s)");
ASSERT_TRUE(run());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册