未验证 提交 c3e098d7 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #10944 from taosdata/feature/3.0_wxy

reorganize physical plan code
......@@ -135,6 +135,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_SUBPLAN,
......@@ -169,6 +170,7 @@ void nodesDestroyNode(SNodeptr pNode);
SNodeList* nodesMakeList();
int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode);
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode);
int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode);
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc);
int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc);
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
......
......@@ -95,6 +95,7 @@ typedef struct SWindowLogicNode {
int8_t intervalUnit;
int8_t slidingUnit;
SFillNode* pFill;
int64_t sessionGap;
} SWindowLogicNode;
typedef enum ESubplanType {
......@@ -110,7 +111,7 @@ typedef struct SSubplanId {
int32_t subplanId;
} SSubplanId;
typedef struct SSubLogicPlan {
typedef struct SLogicSubplan {
ENodeType type;
SSubplanId id;
SNodeList* pChildren;
......@@ -120,7 +121,7 @@ typedef struct SSubLogicPlan {
SVgroupsInfo* pVgroupList;
int32_t level;
int32_t splitFlag;
} SSubLogicPlan;
} SLogicSubplan;
typedef struct SQueryLogicPlan {
ENodeType type;
......@@ -213,10 +214,14 @@ typedef struct SExchangePhysiNode {
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
typedef struct SIntervalPhysiNode {
typedef struct SWinodwPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs;
} SWinodwPhysiNode;
typedef struct SIntervalPhysiNode {
SWinodwPhysiNode window;
int64_t interval;
int64_t offset;
int64_t sliding;
......@@ -225,6 +230,11 @@ typedef struct SIntervalPhysiNode {
SFillNode* pFill;
} SIntervalPhysiNode;
typedef struct SSessionWinodwPhysiNode {
SWinodwPhysiNode window;
int64_t gap;
} SSessionWinodwPhysiNode;
typedef struct SDataSinkNode {
ENodeType type;
SDataBlockDescNode* pInputDataBlockDesc;
......
......@@ -191,8 +191,8 @@ typedef struct SStateWindowNode {
typedef struct SSessionWindowNode {
ENodeType type; // QUERY_NODE_SESSION_WINDOW
int64_t gap; // gap between two session window(in microseconds)
SNode* pCol;
SNode* pGap; // gap between two session window(in microseconds)
} SSessionWindowNode;
typedef struct SIntervalWindowNode {
......
......@@ -287,6 +287,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->dbFName);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU32(buf, pReq->ttl);
tlen += taosEncodeFixedU32(buf, pReq->keep);
......@@ -360,6 +361,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeFixedI64(buf, &(pReq->ver));
buf = taosDecodeString(buf, &(pReq->dbFName));
buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
buf = taosDecodeFixedU32(buf, &(pReq->keep));
......@@ -478,7 +480,7 @@ void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
buf = taosDecodeFixedU32(buf, &nsize);
pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq));
for (size_t i = 0; i < nsize; i++) {
SVCreateTbReq req;
SVCreateTbReq req = {0};
buf = tDeserializeSVCreateTbReq(buf, &req);
taosArrayPush(pReq->pArray, &req);
}
......
......@@ -158,6 +158,7 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
for (int i = 0; i < 1; ++i) {
SVCreateTbReq req = {0};
req.ver = 0;
req.dbFName = (char*)"1.db1";
req.name = (char*)"stb1";
req.ttl = 0;
req.keep = 0;
......@@ -229,6 +230,7 @@ TEST_F(DndTestVnode, 04_Alter_Stb) {
for (int i = 0; i < 1; ++i) {
SVCreateTbReq req = {0};
req.ver = 0;
req.dbFName = (char*)"1.db1";
req.name = (char*)"stb1";
req.ttl = 0;
req.keep = 0;
......
......@@ -286,9 +286,12 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFName);
SVCreateTbReq req = {0};
req.ver = 0;
req.dbFName = dbFName;
req.name = (char *)tNameGetTableName(&name);
req.ttl = 0;
req.keep = 0;
......
......@@ -8487,7 +8487,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, NULL, &num);
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding,
......
......@@ -272,11 +272,14 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
COPY_SCALAR_FIELD(intervalUnit);
COPY_SCALAR_FIELD(slidingUnit);
CLONE_NODE_FIELD(pFill);
COPY_SCALAR_FIELD(sessionGap);
return (SNode*)pDst;
}
static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) {
static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) {
CLONE_NODE_FIELD(pNode);
COPY_SCALAR_FIELD(subplanType);
return (SNode*)pDst;
......@@ -358,7 +361,7 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst);
return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst);
default:
break;
}
......
......@@ -70,6 +70,14 @@ const char* nodesNodeName(ENodeType type) {
return "SlotDesc";
case QUERY_NODE_COLUMN_DEF:
return "ColumnDef";
case QUERY_NODE_DOWNSTREAM_SOURCE:
return "DownstreamSource";
case QUERY_NODE_DATABASE_OPTIONS:
return "DatabaseOptions";
case QUERY_NODE_TABLE_OPTIONS:
return "TableOptions";
case QUERY_NODE_INDEX_OPTIONS:
return "IndexOptions";
case QUERY_NODE_SET_OPERATOR:
return "SetOperator";
case QUERY_NODE_SELECT_STMT:
......@@ -78,16 +86,76 @@ const char* nodesNodeName(ENodeType type) {
return "VnodeModifStmt";
case QUERY_NODE_CREATE_DATABASE_STMT:
return "CreateDatabaseStmt";
case QUERY_NODE_DROP_DATABASE_STMT:
return "DropDatabaseStmt";
case QUERY_NODE_ALTER_DATABASE_STMT:
return "AlterDatabaseStmt";
case QUERY_NODE_CREATE_TABLE_STMT:
return "CreateTableStmt";
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
return "CreateSubtableClause";
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
return "CreateMultiTableStmt";
case QUERY_NODE_DROP_TABLE_CLAUSE:
return "DropTableClause";
case QUERY_NODE_DROP_TABLE_STMT:
return "DropTableStmt";
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
return "DropSuperTableStmt";
case QUERY_NODE_ALTER_TABLE_STMT:
return "AlterTableStmt";
case QUERY_NODE_CREATE_USER_STMT:
return "CreateUserStmt";
case QUERY_NODE_ALTER_USER_STMT:
return "AlterUserStmt";
case QUERY_NODE_DROP_USER_STMT:
return "DropUserStmt";
case QUERY_NODE_USE_DATABASE_STMT:
return "UseDatabaseStmt";
case QUERY_NODE_CREATE_DNODE_STMT:
return "CreateDnodeStmt";
case QUERY_NODE_DROP_DNODE_STMT:
return "DropDnodeStmt";
case QUERY_NODE_ALTER_DNODE_STMT:
return "AlterDnodeStmt";
case QUERY_NODE_CREATE_INDEX_STMT:
return "CreateIndexStmt";
case QUERY_NODE_DROP_INDEX_STMT:
return "DropIndexStmt";
case QUERY_NODE_CREATE_QNODE_STMT:
return "CreateQnodeStmt";
case QUERY_NODE_DROP_QNODE_STMT:
return "DropQnodeStmt";
case QUERY_NODE_CREATE_TOPIC_STMT:
return "CreateTopicStmt";
case QUERY_NODE_DROP_TOPIC_STMT:
return "DropTopicStmt";
case QUERY_NODE_ALTER_LOCAL_STMT:
return "AlterLocalStmt";
case QUERY_NODE_SHOW_DATABASES_STMT:
return "ShowDatabaseStmt";
case QUERY_NODE_SHOW_TABLES_STMT:
return "ShowTablesStmt";
case QUERY_NODE_CREATE_TOPIC_STMT:
return "CreateTopicStmt";
case QUERY_NODE_SHOW_STABLES_STMT:
return "ShowStablesStmt";
case QUERY_NODE_SHOW_USERS_STMT:
return "ShowUsersStmt";
case QUERY_NODE_SHOW_DNODES_STMT:
return "ShowDnodesStmt";
case QUERY_NODE_SHOW_VGROUPS_STMT:
return "ShowVgroupsStmt";
case QUERY_NODE_SHOW_MNODES_STMT:
return "ShowMnodesStmt";
case QUERY_NODE_SHOW_MODULES_STMT:
return "ShowModulesStmt";
case QUERY_NODE_SHOW_QNODES_STMT:
return "ShowQnodesStmt";
case QUERY_NODE_SHOW_FUNCTIONS_STMT:
return "ShowFunctionsStmt";
case QUERY_NODE_SHOW_INDEXES_STMT:
return "ShowIndexesStmt";
case QUERY_NODE_SHOW_STREAMS_STMT:
return "ShowStreamsStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -98,6 +166,10 @@ const char* nodesNodeName(ENodeType type) {
return "LogicProject";
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF:
return "LogicVnodeModif";
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return "LogicExchange";
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return "LogicWindow";
case QUERY_NODE_LOGIC_SUBPLAN:
return "LogicSubplan";
case QUERY_NODE_LOGIC_PLAN:
......@@ -124,6 +196,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return "PhysiInterval";
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return "PhysiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
......@@ -846,8 +920,37 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkIntervalPhysiPlanExprs = "Exprs";
static const char* jkIntervalPhysiPlanFuncs = "Funcs";
static const char* jkWindowPhysiPlanExprs = "Exprs";
static const char* jkWindowPhysiPlanFuncs = "Funcs";
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkWindowPhysiPlanExprs, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkWindowPhysiPlanFuncs, pNode->pFuncs);
}
return code;
}
static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
SWinodwPhysiNode* pNode = (SWinodwPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkWindowPhysiPlanExprs, &pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkWindowPhysiPlanFuncs, &pNode->pFuncs);
}
return code;
}
static const char* jkIntervalPhysiPlanInterval = "Interval";
static const char* jkIntervalPhysiPlanOffset = "Offset";
static const char* jkIntervalPhysiPlanSliding = "Sliding";
......@@ -858,13 +961,7 @@ static const char* jkIntervalPhysiPlanFill = "Fill";
static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkIntervalPhysiPlanExprs, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkIntervalPhysiPlanFuncs, pNode->pFuncs);
}
int32_t code = physiWindowNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanInterval, pNode->interval);
}
......@@ -890,13 +987,7 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
SIntervalPhysiNode* pNode = (SIntervalPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkIntervalPhysiPlanExprs, &pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkIntervalPhysiPlanFuncs, &pNode->pFuncs);
}
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanInterval, &pNode->interval);
}
......@@ -919,6 +1010,30 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkSessionWindowPhysiPlanGap = "Gap";
static int32_t physiSessionWindowNodeToJson(const void* pObj, SJson* pJson) {
const SSessionWinodwPhysiNode* pNode = (const SSessionWinodwPhysiNode*)pObj;
int32_t code = physiWindowNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSessionWindowPhysiPlanGap, pNode->gap);
}
return code;
}
static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) {
SSessionWinodwPhysiNode* pNode = (SSessionWinodwPhysiNode*)pObj;
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkSessionWindowPhysiPlanGap, pNode->gap);
}
return code;
}
static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc";
static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) {
......@@ -2066,6 +2181,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
break;
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return physiIntervalNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return physiSessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
......@@ -2075,7 +2192,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN:
return planToJson(pObj, pJson);
default:
assert(0);
// assert(0);
break;
}
nodesWarn("specificNodeToJson unknown node = %s", nodesNodeName(nodeType(pObj)));
......@@ -2149,14 +2266,16 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return jsonToPhysiExchangeNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return jsonToPhysiIntervalNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return jsonToPhysiSessionWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return jsonToPhysiDispatchNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_SUBPLAN:
return jsonToSubplan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN:
return jsonToPlan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return jsonToPhysiIntervalNode(pJson, pObj);
default:
assert(0);
break;
......
......@@ -79,9 +79,14 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case QUERY_NODE_STATE_WINDOW:
res = walkNode(((SStateWindowNode*)pNode)->pCol, order, walker, pContext);
break;
case QUERY_NODE_SESSION_WINDOW:
res = walkNode(((SSessionWindowNode*)pNode)->pCol, order, walker, pContext);
case QUERY_NODE_SESSION_WINDOW: {
SSessionWindowNode* pSession = (SSessionWindowNode*)pNode;
res = walkNode(pSession->pCol, order, walker, pContext);
if (DEAL_RES_ERROR != res) {
res = walkNode(pSession->pGap, order, walker, pContext);
}
break;
}
case QUERY_NODE_INTERVAL_WINDOW: {
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode;
res = walkNode(pInterval->pInterval, order, walker, pContext);
......
......@@ -160,7 +160,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return makeNode(type, sizeof(SWindowLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SSubLogicPlan));
return makeNode(type, sizeof(SLogicSubplan));
case QUERY_NODE_LOGIC_PLAN:
return makeNode(type, sizeof(SQueryLogicPlan));
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
......@@ -185,6 +185,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SNode));
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return makeNode(type, sizeof(SIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return makeNode(type, sizeof(SSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return makeNode(type, sizeof(SDataDispatcherNode));
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
......@@ -332,6 +334,7 @@ int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) {
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode) {
if (NULL == pNode) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = nodesListAppend(pList, pNode);
......@@ -341,6 +344,17 @@ int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode) {
return code;
}
int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode) {
if (NULL == *pList) {
*pList = nodesMakeList();
if (NULL == *pList) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return nodesListAppend(*pList, pNode);
}
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
if (NULL == pTarget || NULL == pSrc) {
return TSDB_CODE_SUCCESS;
......
......@@ -100,7 +100,7 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, SNode* pLeft, SNode* pRight, SNode* pJoinCond);
SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const SToken* pOffset);
SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder);
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, const SToken* pVal);
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap);
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pCol);
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill);
SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValues);
......
......@@ -624,7 +624,7 @@ partition_by_clause_opt(A) ::= PARTITION BY expression_list(B).
twindow_clause_opt(A) ::= . { A = NULL; }
twindow_clause_opt(A) ::=
SESSION NK_LP column_reference(B) NK_COMMA NK_INTEGER(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B), &C); }
SESSION NK_LP column_reference(B) NK_COMMA duration_literal(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); }
twindow_clause_opt(A) ::= STATE_WINDOW NK_LP column_reference(B) NK_RP. { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B)); }
twindow_clause_opt(A) ::=
INTERVAL NK_LP duration_literal(B) NK_RP sliding_opt(C) fill_opt(D). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), NULL, C, D); }
......
......@@ -679,11 +679,11 @@ SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order
return (SNode*)orderByExpr;
}
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, const SToken* pVal) {
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap) {
SSessionWindowNode* session = (SSessionWindowNode*)nodesMakeNode(QUERY_NODE_SESSION_WINDOW);
CHECK_OUT_OF_MEM(session);
session->pCol = pCol;
// session->gap = getInteger(pVal);
session->pGap = pGap;
return (SNode*)session;
}
......
......@@ -1870,14 +1870,21 @@ static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema
}
static void destroyCreateTbReq(SVCreateTbReq* pReq) {
tfree(pReq->dbFName);
tfree(pReq->name);
tfree(pReq->ntbCfg.pSchema);
}
static int32_t buildNormalTableBatchReq(
const char* pDbName, const char* pTableName, const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, const char* pTableName,
const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = { .type = TSDB_DB_NAME_T, .acctId = acctId };
strcpy(name.dbname, pDbName);
tNameGetFullDbName(&name, dbFName);
SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE;
req.dbFName = strdup(dbFName);
req.name = strdup(pTableName);
req.ntbCfg.nCols = LIST_LENGTH(pColumns);
req.ntbCfg.pSchema = calloc(req.ntbCfg.nCols, sizeof(SSchema));
......@@ -1904,7 +1911,7 @@ static int32_t buildNormalTableBatchReq(
return TSDB_CODE_SUCCESS;
}
static int32_t serializeVgroupTablesBatch(int32_t acctId, SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (NULL == buf) {
......@@ -1932,6 +1939,7 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
for(int32_t i = 0; i < size; ++i) {
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
tfree(pTableReq->dbFName);
tfree(pTableReq->name);
if (pTableReq->type == TSDB_NORMAL_TABLE) {
......@@ -1973,9 +1981,9 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
}
SVgroupTablesBatch tbatch = {0};
int32_t code = buildNormalTableBatchReq(pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch);
int32_t code = buildNormalTableBatchReq(acctId, pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch);
if (TSDB_CODE_SUCCESS == code) {
code = serializeVgroupTablesBatch(acctId, &tbatch, *pBufArray);
code = serializeVgroupTablesBatch(&tbatch, *pBufArray);
}
destroyCreateTbReqBatch(&tbatch);
......@@ -2004,9 +2012,16 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
return code;
}
static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pDbName, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) {
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap,
const char* pDbName, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = { .type = TSDB_DB_NAME_T, .acctId = acctId };
strcpy(name.dbname, pDbName);
tNameGetFullDbName(&name, dbFName);
struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
req.dbFName = strdup(dbFName);
req.name = strdup(pTableName);
req.ctbCfg.suid = suid;
req.ctbCfg.pTag = row;
......@@ -2159,7 +2174,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
}
if (TSDB_CODE_SUCCESS == code) {
addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info);
addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info);
}
tfree(pSuperTableMeta);
......@@ -2181,7 +2196,7 @@ static SArray* serializeVgroupsTablesBatch(int32_t acctId, SHashObj* pVgroupHash
break;
}
serializeVgroupTablesBatch(acctId, pTbBatch, pBufArray);
serializeVgroupTablesBatch(pTbBatch, pBufArray);
destroyCreateTbReqBatch(pTbBatch);
} while (true);
......
......@@ -71,6 +71,7 @@ int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...) {
va_start(vArgList, errCode);
vsnprintf(pBuf->buf, pBuf->len, getSyntaxErrFormat(errCode), vArgList);
va_end(vArgList);
terrno = errCode;
return errCode;
}
......
此差异已折叠。
......@@ -44,6 +44,9 @@ protected:
query_ = nullptr;
bool res = runImpl(parseCode, translateCode);
qDestroyQuery(query_);
if (!res) {
dump();
}
return res;
}
......@@ -53,24 +56,38 @@ private:
bool runImpl(int32_t parseCode, int32_t translateCode) {
int32_t code = doParse(&cxt_, &query_);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] parser code:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl;
return (TSDB_CODE_SUCCESS != parseCode);
parseErrStr_ = string("code:") + tstrerror(code) + string(", msg:") + errMagBuf_;
return (terrno == parseCode);
}
if (TSDB_CODE_SUCCESS != parseCode) {
return false;
}
string parserStr = toString(query_->pRoot);
parsedAstStr_ = toString(query_->pRoot);
code = doTranslate(&cxt_, query_);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] translate code:" << code << ", " << translateCode << ", msg:" << errMagBuf_ << endl;
return (code == translateCode);
translateErrStr_ = string("code:") + tstrerror(code) + string(", msg:") + errMagBuf_;
return (terrno == translateCode);
}
translatedAstStr_ = toString(query_->pRoot);
return (TSDB_CODE_SUCCESS == translateCode);
}
void dump() {
cout << "input sql : [" << cxt_.pSql << "]" << endl;
cout << "parser output: " << endl;
cout << parserStr << endl;
if (!parseErrStr_.empty()) {
cout << "parse error: " << parseErrStr_ << endl;
}
if (!parsedAstStr_.empty()) {
cout << "parse output: " << endl;
cout << parsedAstStr_ << endl;
}
if (!translateErrStr_.empty()) {
cout << "translate error: " << translateErrStr_ << endl;
}
if (!translatedAstStr_.empty()) {
cout << "translate output: " << endl;
cout << toString(query_->pRoot) << endl;
return (TSDB_CODE_SUCCESS == translateCode);
cout << translatedAstStr_ << endl;
}
}
string toString(const SNode* pRoot, bool format = false) {
......@@ -91,6 +108,10 @@ private:
memset(errMagBuf_, 0, max_err_len);
cxt_.pMsg = errMagBuf_;
cxt_.msgLen = max_err_len;
parseErrStr_.clear();
parsedAstStr_.clear();
translateErrStr_.clear();
translatedAstStr_.clear();
}
string acctId_;
......@@ -99,8 +120,50 @@ private:
string sqlBuf_;
SParseContext cxt_;
SQuery* query_;
string parseErrStr_;
string parsedAstStr_;
string translateErrStr_;
string translatedAstStr_;
};
TEST_F(ParserTest, createAccount) {
setDatabase("root", "test");
bind("create account ac_wxy pass '123456'");
ASSERT_TRUE(run(TSDB_CODE_PAR_EXPRIE_STATEMENT));
}
TEST_F(ParserTest, alterAccount) {
setDatabase("root", "test");
bind("alter account ac_wxy pass '123456'");
ASSERT_TRUE(run(TSDB_CODE_PAR_EXPRIE_STATEMENT));
}
TEST_F(ParserTest, createUser) {
setDatabase("root", "test");
bind("create user wxy pass '123456'");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, alterUser) {
setDatabase("root", "test");
bind("alter user wxy pass '123456'");
ASSERT_TRUE(run());
bind("alter user wxy privilege 'write'");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, dropUser) {
setDatabase("root", "test");
bind("drop user wxy");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, selectSimple) {
setDatabase("root", "test");
......@@ -295,12 +358,6 @@ TEST_F(ParserTest, selectSemanticError) {
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION));
}
TEST_F(ParserTest, createUser) {
setDatabase("root", "test");
bind("create user wxy pass '123456'");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, showUsers) {
setDatabase("root", "test");
......@@ -309,12 +366,7 @@ TEST_F(ParserTest, showUsers) {
ASSERT_TRUE(run());
}
TEST_F(ParserTest, alterAccount) {
setDatabase("root", "test");
bind("alter account ac_wxy pass '123456'");
ASSERT_TRUE(run(TSDB_CODE_PAR_EXPRIE_STATEMENT));
}
TEST_F(ParserTest, createDnode) {
setDatabase("root", "test");
......
......@@ -56,9 +56,10 @@ extern "C" {
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode);
int32_t applySplitRule(SSubLogicPlan* pSubplan);
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan, SArray* pExecNodeList);
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode);
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan);
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
#ifdef __cplusplus
}
......
......@@ -411,6 +411,38 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
return code;
}
static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) {
int32_t code = nodesCollectFuncs(pSelect, fmIsAggFunc, &pWindow->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pWindow->pFuncs, &pWindow->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pWindow;
} else {
nodesDestroyNode(pWindow);
}
return code;
}
static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionWindowNode* pSession, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
if (NULL == pWindow) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pWindow->winType = WINDOW_TYPE_SESSION;
pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i;
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SIntervalWindowNode* pInterval, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
if (NULL == pWindow) {
......@@ -424,34 +456,15 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pInterval->pFill) {
pWindow->pFill = nodesCloneNode(pInterval->pFill);
if (NULL == pWindow->pFill) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectFuncs(pSelect, fmIsAggFunc, &pWindow->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pWindow->pFuncs, &pWindow->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pWindow;
} else {
nodesDestroyNode(pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return code;
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
......@@ -460,6 +473,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
}
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_SESSION_WINDOW:
return createWindowLogicNodeBySession(pCxt, (SSessionWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_INTERVAL_WINDOW:
return createWindowLogicNodeByInterval(pCxt, (SIntervalWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
default:
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planInt.h"
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) {
return TSDB_CODE_SUCCESS;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planInt.h"
typedef struct SScaleOutContext {
SPlanContext* pPlanCxt;
int32_t subplanId;
} SScaleOutContext;
static SLogicSubplan* singleCloneSubLogicPlan(SScaleOutContext* pCxt, SLogicSubplan* pSrc, int32_t level) {
SLogicSubplan* pDst = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pDst) {
return NULL;
}
pDst->pNode = nodesCloneNode(pSrc->pNode);
if (NULL == pDst->pNode) {
nodesDestroyNode(pDst);
return NULL;
}
pDst->subplanType = pSrc->subplanType;
pDst->level = level;
pDst->id.queryId = pSrc->id.queryId;
pDst->id.groupId = pSrc->id.groupId;
pDst->id.subplanId = pCxt->subplanId++;
return pDst;
}
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
SVnodeModifLogicNode* pNode = (SVnodeModifLogicNode*)pSubplan->pNode;
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
for (int32_t i = 0; i < numOfVgroups; ++i) {
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
if (NULL == pNewSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, pNewSubplan)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
return nodesListStrictAppend(pGroup, singleCloneSubLogicPlan(pCxt, pSubplan, level));
}
static int32_t doSetScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup, bool* pFound) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
pScan->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
if (NULL == pScan->pVgroupList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pScan->pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo));
*pFound = true;
return TSDB_CODE_SUCCESS;
}
SNode* pChild = NULL;
FOREACH(pChild, pNode->pChildren) {
int32_t code = doSetScanVgroup((SLogicNode*)pChild, pVgroup, pFound);
if (TSDB_CODE_SUCCESS != code || *pFound) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) {
bool found = false;
return doSetScanVgroup(pNode, pVgroup, &found);
}
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
if (pSubplan->pVgroupList) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
if (NULL == pNewSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
code = setScanVgroup(pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pGroup, pNewSubplan);
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
} else {
return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
}
}
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
int32_t code = TSDB_CODE_SUCCESS;
bool topLevel = (0 == LIST_LENGTH(pParentsGroup));
SNode* pChild = NULL;
FOREACH(pChild, pCurrentGroup) {
if (topLevel) {
code = nodesListAppend(pParentsGroup, pChild);
} else {
SNode* pParent = NULL;
FOREACH(pParent, pParentsGroup) {
code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
}
}
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t* pLevel, SNodeList* pParentsGroup) {
SNodeList* pCurrentGroup = nodesMakeList();
if (NULL == pCurrentGroup) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
switch (pSubplan->subplanType) {
case SUBPLAN_TYPE_MERGE:
code = scaleOutForMerge(pCxt, pSubplan, *pLevel, pCurrentGroup);
break;
case SUBPLAN_TYPE_SCAN:
code = scaleOutForScan(pCxt, pSubplan, *pLevel, pCurrentGroup);
break;
case SUBPLAN_TYPE_MODIFY:
code = scaleOutForModify(pCxt, pSubplan, *pLevel, pCurrentGroup);
break;
default:
break;
}
if (TSDB_CODE_SUCCESS == code) {
code = pushHierarchicalPlan(pParentsGroup, pCurrentGroup);
++(*pLevel);
}
if (TSDB_CODE_SUCCESS == code) {
SNode* pChild;
FOREACH(pChild, pSubplan->pChildren) {
code = doScaleOut(pCxt, (SLogicSubplan*)pChild, pLevel, pCurrentGroup);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pCurrentGroup);
}
return code;
}
static SQueryLogicPlan* makeQueryLogicPlan() {
SQueryLogicPlan* pLogicPlan = (SQueryLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN);
if (NULL == pLogicPlan) {
return NULL;
}
pLogicPlan->pTopSubplans = nodesMakeList();
if (NULL == pLogicPlan->pTopSubplans) {
nodesDestroyNode(pLogicPlan);
return NULL;
}
return pLogicPlan;
}
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) {
SQueryLogicPlan* pPlan = makeQueryLogicPlan();
if (NULL == pPlan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SScaleOutContext cxt = { .pPlanCxt = pCxt, .subplanId = 1 };
int32_t code = doScaleOut(&cxt, pLogicSubplan, &(pPlan->totalLevel), pPlan->pTopSubplans);
if (TSDB_CODE_SUCCESS == code) {
*pLogicPlan = pPlan;
} else {
nodesDestroyNode(pPlan);
}
return code;
}
......@@ -29,7 +29,7 @@ typedef struct SSplitContext {
void* pInfo;
} SSplitContext;
typedef int32_t (*FMatch)(SSplitContext* pCxt, SSubLogicPlan* pSubplan);
typedef int32_t (*FMatch)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
typedef int32_t (*FSplit)(SSplitContext* pCxt);
typedef struct SSplitRule {
......@@ -40,7 +40,7 @@ typedef struct SSplitRule {
typedef struct SStsInfo {
SScanLogicNode* pScan;
SSubLogicPlan* pSubplan;
SLogicSubplan* pSubplan;
} SStsInfo;
static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
......@@ -58,7 +58,7 @@ static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
return NULL;
}
static int32_t stsMatch(SSplitContext* pCxt, SSubLogicPlan* pSubplan) {
static int32_t stsMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
if (SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS)) {
return TSDB_CODE_SUCCESS;
}
......@@ -74,7 +74,7 @@ static int32_t stsMatch(SSplitContext* pCxt, SSubLogicPlan* pSubplan) {
}
SNode* pChild;
FOREACH(pChild, pSubplan->pChildren) {
int32_t code = stsMatch(pCxt, (SSubLogicPlan*)pChild);
int32_t code = stsMatch(pCxt, (SLogicSubplan*)pChild);
if (TSDB_CODE_SUCCESS != code || pCxt->match) {
return code;
}
......@@ -82,8 +82,8 @@ static int32_t stsMatch(SSplitContext* pCxt, SSubLogicPlan* pSubplan) {
return TSDB_CODE_SUCCESS;
}
static SSubLogicPlan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan) {
SSubLogicPlan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan) {
SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return NULL;
}
......@@ -95,7 +95,7 @@ static SSubLogicPlan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode*
return pSubplan;
}
static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SSubLogicPlan* pSubplan, SScanLogicNode* pScan) {
static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) {
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -145,7 +145,7 @@ static const SSplitRule splitRuleSet[] = {
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
int32_t applySplitRule(SSubLogicPlan* pSubplan) {
static int32_t applySplitRule(SLogicSubplan* pSubplan) {
SSplitContext cxt = { .errCode = TSDB_CODE_SUCCESS, .groupId = pSubplan->id.groupId + 1, .match = false, .pInfo = NULL };
bool split = false;
do {
......@@ -164,3 +164,45 @@ int32_t applySplitRule(SSubLogicPlan* pSubplan) {
} while (split);
return TSDB_CODE_SUCCESS;
}
static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
pNode->pParent = pParent;
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
doSetLogicNodeParent((SLogicNode*)pChild, pNode);
}
}
static void setLogicNodeParent(SLogicNode* pNode) {
doSetLogicNodeParent(pNode, NULL);
}
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) {
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->pNode = nodesCloneNode(pLogicNode);
if (NULL == pSubplan->pNode) {
nodesDestroyNode(pSubplan);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIF == nodeType(pLogicNode)) {
pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
TSWAP(((SVnodeModifLogicNode*)pLogicNode)->pDataBlocks, ((SVnodeModifLogicNode*)pSubplan->pNode)->pDataBlocks, SArray*);
} else {
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
}
pSubplan->id.queryId = pCxt->queryId;
setLogicNodeParent(pSubplan->pNode);
int32_t code = applySplitRule(pSubplan);
if (TSDB_CODE_SUCCESS == code) {
*pLogicSubplan = pSubplan;
} else {
nodesDestroyNode(pSubplan);
}
return code;
}
\ No newline at end of file
......@@ -17,20 +17,29 @@
#include "planInt.h"
int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode) {
return TSDB_CODE_SUCCESS;
}
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
SLogicNode* pLogicNode = NULL;
SLogicSubplan* pLogicSubplan = NULL;
SQueryLogicPlan* pLogicPlan = NULL;
int32_t code = createLogicPlan(pCxt, &pLogicNode);
if (TSDB_CODE_SUCCESS == code) {
code = optimize(pCxt, pLogicNode);
code = optimizeLogicPlan(pCxt, pLogicNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = createPhysiPlan(pCxt, pLogicNode, pPlan, pExecNodeList);
code = splitLogicPlan(pCxt, pLogicNode, &pLogicSubplan);
}
if (TSDB_CODE_SUCCESS == code) {
code = scaleOutLogicPlan(pCxt, pLogicSubplan, &pLogicPlan);
}
if (TSDB_CODE_SUCCESS == code) {
code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
}
nodesDestroyNode(pLogicNode);
nodesDestroyNode(pLogicSubplan);
nodesDestroyNode(pLogicPlan);
terrno = code;
return code;
}
......
......@@ -26,11 +26,6 @@ using namespace testing;
class PlannerTest : public Test {
protected:
enum TestTarget {
TEST_LOGIC_PLAN,
TEST_PHYSICAL_PLAN
};
void setDatabase(const string& acctId, const string& db) {
acctId_ = acctId;
db_ = db;
......@@ -46,7 +41,7 @@ protected:
cxt_.pSql = sqlBuf_.c_str();
}
bool run(TestTarget target = TEST_PHYSICAL_PLAN) {
bool run() {
int32_t code = qParseQuerySql(&cxt_, &query_);
if (code != TSDB_CODE_SUCCESS) {
......@@ -56,12 +51,12 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicPlan = nullptr;
SLogicNode* pLogicNode = nullptr;
SPlanContext cxt = { .queryId = 1, .acctId = 0 };
setPlanContext(query_, &cxt);
code = createLogicPlan(&cxt, &pLogicPlan);
code = createLogicPlan(&cxt, &pLogicNode);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
cout << "sql:[" << cxt_.pSql << "] createLogicPlan code:" << code << ", strerror:" << tstrerror(code) << endl;
return false;
}
......@@ -69,15 +64,29 @@ protected:
cout << "syntax test : " << endl;
cout << syntaxTreeStr << endl;
cout << "unformatted logic plan : " << endl;
cout << toString((const SNode*)pLogicPlan, false) << endl;
cout << toString((const SNode*)pLogicNode, false) << endl;
SLogicSubplan* pLogicSubplan = nullptr;
code = splitLogicPlan(&cxt, pLogicNode, &pLogicSubplan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] splitLogicPlan code:" << code << ", strerror:" << tstrerror(code) << endl;
return false;
}
SQueryLogicPlan* pLogicPlan = NULL;
code = scaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] createPhysiPlan code:" << code << ", strerror:" << tstrerror(code) << endl;
return false;
}
if (TEST_PHYSICAL_PLAN == target) {
SQueryPlan* pPlan = nullptr;
code = createPhysiPlan(&cxt, pLogicPlan, &pPlan, NULL);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] physical plan code:" << code << ", strerror:" << tstrerror(code) << endl;
cout << "sql:[" << cxt_.pSql << "] createPhysiPlan code:" << code << ", strerror:" << tstrerror(code) << endl;
return false;
}
cout << "unformatted physical plan : " << endl;
cout << toString((const SNode*)pPlan, false) << endl;
SNode* pNode;
......@@ -88,7 +97,6 @@ protected:
cout << toString(pSubplan, false) << endl;
}
}
}
return true;
}
......@@ -120,14 +128,6 @@ private:
cout << "sql:[" << cxt_.pSql << "] toString code:" << code << ", strerror:" << tstrerror(code) << endl;
return string();
}
SNode* pNode;
code = nodesStringToNode(pStr, &pNode);
if (code != TSDB_CODE_SUCCESS) {
tfree(pStr);
cout << "sql:[" << cxt_.pSql << "] toObject code:" << code << ", strerror:" << tstrerror(code) << endl;
return string();
}
nodesDestroyNode(pNode);
string str(pStr);
tfree(pStr);
return str;
......@@ -185,6 +185,13 @@ TEST_F(PlannerTest, interval) {
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, sessionWindow) {
setDatabase("root", "test");
bind("SELECT count(*) FROM t1 session(ts, 10s)");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, showTables) {
setDatabase("root", "test");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册