diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 1ff3f02da5ac9a30d6b338f797805dbc2a0feae7..8f217a0deb9ff509a4cc39e0f851510129c8bb62 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -22,6 +22,7 @@ extern "C" { #define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_PARTIAL 2 +#define QUERY_TYPE_SCAN 3 enum OPERATOR_TYPE_E { OP_TableScan = 1, @@ -54,90 +55,37 @@ enum OPERATOR_TYPE_E { struct SEpSet; struct SQueryPlanNode; -struct SQueryDistPlanNode; +struct SPhyNode; struct SQueryStmtInfo; -typedef struct SSubquery { - int64_t queryId; // the subquery id created by qnode - int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL - int32_t level; // the execution level of current subquery, starting from 0. - SArray *pUpstream; // the upstream,from which to fetch the result - struct SQueryDistPlanNode *pNode; // physical plan of current subquery -} SSubquery; +typedef struct SSubplan { + int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN + SArray *pDatasource; // the datasource subplan,from which to fetch the result + struct SPhyNode *pNode; // physical plan of current subplan +} SSubplan; -typedef struct SQueryJob { - SArray **pSubqueries; - int32_t numOfLevels; - int32_t currentLevel; -} SQueryJob; - - -/** - * Optimize the query execution plan, currently not implement yet. - * @param pQueryNode - * @return - */ -int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode); - -/** - * Create the query plan according to the bound AST, which is in the form of pQueryInfo - * @param pQueryInfo - * @param pQueryNode - * @return - */ -int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode); - -/** - * Convert the query plan to string, in order to display it in the shell. - * @param pQueryNode - * @return - */ -int32_t qQueryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); +typedef struct SQueryDag { + SArray **pSubplans; +} SQueryDag; /** - * Restore the SQL statement according to the logic query plan. - * @param pQueryNode - * @param sql - * @return + * Create the physical plan for the query, according to the AST. */ -int32_t qQueryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); +int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag); -/** - * Create the physical plan for the query, according to the logic plan. - * @param pQueryNode - * @param pPhyNode - * @return - */ -int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDistPlanNode *pPhyNode); +int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); /** - * Convert to physical plan to string to enable to print it out in the shell. - * @param pPhyNode - * @param str - * @return + * Convert to subplan to string for the scheduler to send to the executor */ -int32_t qPhyPlanToString(struct SQueryDistPlanNode *pPhyNode, char** str); - -/** - * Destroy the query plan object. - * @return - */ -void* qDestroyQueryPlan(struct SQueryPlanNode* pQueryNode); +int32_t qSubPlanToString(struct SSubplan *pPhyNode, char** str); /** * Destroy the physical plan. * @param pQueryPhyNode * @return */ -void* qDestroyQueryPhyPlan(struct SQueryDistPlanNode* pQueryPhyNode); - -/** - * Create the query job from the physical execution plan - * @param pPhyNode - * @param pJob - * @return - */ -int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQueryJob** pJob); +void* qDestroyQueryDag(struct SQueryDag* pDag); #ifdef __cplusplus } diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index d9653046cfadc84683e8922b2ad94ebe2149017b..6b3c9ed021c6629684bf1e5a7a7fca7bca2b9551 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -20,7 +20,42 @@ extern "C" { #endif -struct SQueryJob; +typedef struct SQueryProfileSummary { + int64_t startTs; // Object created and added into the message queue + int64_t endTs; // the timestamp when the task is completed + int64_t cputime; // total cpu cost, not execute elapsed time + + int64_t loadRemoteDataDuration; // remote io time + int64_t loadNativeDataDuration; // native disk io time + + uint64_t loadNativeData; // blocks + SMA + header files + uint64_t loadRemoteData; // remote data acquired by exchange operator. + + uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it + int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue. + + uint64_t totalRows; + uint64_t loadRows; + uint32_t totalBlocks; + uint32_t loadBlocks; + uint32_t loadBlockAgg; + uint32_t skipBlocks; + uint64_t resultSize; // generated result size in Kb. +} SQueryProfileSummary; + +typedef struct SQueryTask { + uint64_t queryId; // query id + uint64_t taskId; // task id + char *pSubplan; // operator tree + uint64_t status; // task status + SQueryProfileSummary summary; // task execution summary + void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage +} SQueryTask; + +typedef struct SQueryJob { + SArray **pSubtasks; + // todo +} SQueryJob; /** * Process the query job, generated according to the query physical plan. diff --git a/source/libs/parser/test/plannerTest.cpp b/source/libs/parser/test/plannerTest.cpp index bb9271a3c80e928eadee8ee2c42bbfe22b6e4dce..a2078defdaf1acbe5b9e0824f08a934371345d6c 100644 --- a/source/libs/parser/test/plannerTest.cpp +++ b/source/libs/parser/test/plannerTest.cpp @@ -30,6 +30,7 @@ #include "tdef.h" #include "tvariant.h" #include "planner.h" +#include "../../planner/inc/plannerInt.h" namespace { void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_t colId) { @@ -92,10 +93,10 @@ void generateLogicplan(const char* sql) { ASSERT_EQ(ret, 0); struct SQueryPlanNode* n = nullptr; - code = qCreateQueryPlan(pQueryInfo, &n); + code = createQueryPlan(pQueryInfo, &n); char* str = NULL; - qQueryPlanToString(n, &str); + queryPlanToString(n, &str); printf("--------SQL:%s\n", sql); printf("%s\n", str); @@ -155,10 +156,10 @@ TEST(testCase, planner_test) { ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); struct SQueryPlanNode* n = nullptr; - code = qCreateQueryPlan(pQueryInfo, &n); + code = createQueryPlan(pQueryInfo, &n); char* str = NULL; - qQueryPlanToString(n, &str); + queryPlanToString(n, &str); printf("%s\n", str); destroyQueryInfo(pQueryInfo); diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 6bd89905b1831eeeaa08049c992889dbc833be76..2231c933629e5acdd3ea7e08b66bb9e8495108e8 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -25,6 +25,19 @@ extern "C" { #include "planner.h" #include "taosmsg.h" +enum LOGIC_PLAN_E { + LP_SCAN = 1, + LP_SESSION = 2, + LP_STATE = 3, + LP_INTERVAL = 4, + LP_FILL = 5, + LP_AGG = 6, + LP_JOIN = 7, + LP_PROJECT = 8, + LP_DISTINCT = 9, + LP_ORDER = 10 +}; + typedef struct SQueryNodeBasicInfo { int32_t type; // operator type char *name; // operator name @@ -57,50 +70,94 @@ typedef struct SQueryPlanNode { struct SQueryPlanNode *nextNode; } SQueryPlanNode; -typedef struct SQueryDistPlanNode { +typedef SSchema SSlotSchema; + +typedef struct SDataBlockSchema { + int32_t index; + SSlotSchema *pSchema; + int32_t numOfCols; // number of columns +} SDataBlockSchema; + +typedef struct SPhyNode { SQueryNodeBasicInfo info; - SSchema *pSchema; // the schema of the input SSDatablock - int32_t numOfCols; // number of input columns - SArray *pExpr; // the query functions or sql aggregations - int32_t numOfExpr; // number of result columns, which is also the number of pExprs - void *pExtInfo; // additional information + SArray *pTargets; // target list to be computed or scanned at this node + SArray *pConditions; // implicitly-ANDed qual conditions + SDataBlockSchema targetSchema; + // children plan to generated result for current node to process + // in case of join, multiple plan nodes exist. + SArray *pChildren; +} SPhyNode; + +typedef struct SScanPhyNode { + SPhyNode node; + uint64_t uid; // unique id of the table +} SScanPhyNode; + +typedef SScanPhyNode STagScanPhyNode; + +typedef SScanPhyNode SSystemTableScanPhyNode; + +typedef struct SMultiTableScanPhyNode { + SScanPhyNode scan; + SArray *pTagsConditions; // implicitly-ANDed tag qual conditions +} SMultiTableScanPhyNode; + +typedef SMultiTableScanPhyNode SMultiTableSeqScanPhyNode; + +typedef struct SProjectPhyNode { + SPhyNode node; +} SProjectPhyNode; + +/** + * Optimize the query execution plan, currently not implement yet. + * @param pQueryNode + * @return + */ +int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode); - // previous operator to generated result for current node to process - // in case of join, multiple prev nodes exist. - SArray *pPrevNodes; // upstream nodes, or exchange operator to load data from multiple sources. -} SQueryDistPlanNode; - -typedef struct SQueryCostSummary { - int64_t startTs; // Object created and added into the message queue - int64_t endTs; // the timestamp when the task is completed - int64_t cputime; // total cpu cost, not execute elapsed time - - int64_t loadRemoteDataDuration; // remote io time - int64_t loadNativeDataDuration; // native disk io time - - uint64_t loadNativeData; // blocks + SMA + header files - uint64_t loadRemoteData; // remote data acquired by exchange operator. - - uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it - int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue. - - uint64_t totalRows; - uint64_t loadRows; - uint32_t totalBlocks; - uint32_t loadBlocks; - uint32_t loadBlockAgg; - uint32_t skipBlocks; - uint64_t resultSize; // generated result size in Kb. -} SQueryCostSummary; - -typedef struct SQueryTask { - uint64_t queryId; // query id - uint64_t taskId; // task id - SQueryDistPlanNode *pNode; // operator tree - uint64_t status; // task status - SQueryCostSummary summary; // task execution summary - void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage -} SQueryTask; +/** + * Create the query plan according to the bound AST, which is in the form of pQueryInfo + * @param pQueryInfo + * @param pQueryNode + * @return + */ +int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode); + +/** + * Convert the query plan to string, in order to display it in the shell. + * @param pQueryNode + * @return + */ +int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); + +/** + * Restore the SQL statement according to the logic query plan. + * @param pQueryNode + * @param sql + * @return + */ +int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); + +/** + * Convert to physical plan to string to enable to print it out in the shell. + * @param pPhyNode + * @param str + * @return + */ +int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str); + +/** + * Destroy the query plan object. + * @return + */ +void* destroyQueryPlan(struct SQueryPlanNode* pQueryNode); + +/** + * Destroy the physical plan. + * @param pQueryPhyNode + * @return + */ +void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode); #ifdef __cplusplus } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c new file mode 100644 index 0000000000000000000000000000000000000000..2bdc159af856aabc45fd0ccf508bf139c6d7b622 --- /dev/null +++ b/source/libs/planner/src/physicalPlan.c @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "plannerInt.h" + +SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) { + return NULL; +} + +SPhyNode* createPhyNode(SQueryPlanNode* node) { + switch (node->info.type) { + case LP_SCAN: + return createScanNode(node); + } + return NULL; +} + +SPhyNode* createSubplan(SQueryPlanNode* pSubquery) { + return NULL; +} + +int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { + return 0; +} diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 79c7691698f670a0ade9d7ae1dd38f5923ce41f5..e54b8472304bf48d62e93470467714c7aec0fc03 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -48,11 +48,11 @@ static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo); static void doDestroyQueryNode(SQueryPlanNode* pQueryNode); int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len); -int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { +int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { return 0; } -int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) { +int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) { SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo); assert(taosArrayGetSize(upstream) == 1); @@ -62,19 +62,20 @@ int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryP return TSDB_CODE_SUCCESS; } -int32_t qQueryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { +int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { return 0; } -int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDistPlanNode *pPhyNode) { +int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { + return 0; } -int32_t qPhyPlanToString(struct SQueryDistPlanNode *pPhyNode, char** str) { +int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str) { return 0; } -void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) { +void* destroyQueryPlan(SQueryPlanNode* pQueryNode) { if (pQueryNode == NULL) { return NULL; } @@ -83,14 +84,10 @@ void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) { return NULL; } -void* qDestroyQueryPhyPlan(struct SQueryDistPlanNode* pQueryPhyNode) { +void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode) { return NULL; } -int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQueryJob** pJob) { - return 0; -} - //====================================================================================================================== static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, @@ -619,7 +616,7 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev return len; } -int32_t qQueryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { +int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { assert(pQueryNode); *str = calloc(1, 4096);