未验证 提交 33a13ff5 编写于 作者: X xiao-yu-wang 提交者: GitHub

Merge pull request #9089 from taosdata/feature/3.0_wxy

Feature/3.0 wxy Organize planner module interface, and define physical plan structure.
...@@ -22,6 +22,7 @@ extern "C" { ...@@ -22,6 +22,7 @@ extern "C" {
#define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2 #define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
enum OPERATOR_TYPE_E { enum OPERATOR_TYPE_E {
OP_TableScan = 1, OP_TableScan = 1,
...@@ -54,90 +55,37 @@ enum OPERATOR_TYPE_E { ...@@ -54,90 +55,37 @@ enum OPERATOR_TYPE_E {
struct SEpSet; struct SEpSet;
struct SQueryPlanNode; struct SQueryPlanNode;
struct SQueryDistPlanNode; struct SPhyNode;
struct SQueryStmtInfo; struct SQueryStmtInfo;
typedef struct SSubquery { typedef struct SSubplan {
int64_t queryId; // the subquery id created by qnode int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL SArray *pDatasource; // the datasource subplan,from which to fetch the result
int32_t level; // the execution level of current subquery, starting from 0. struct SPhyNode *pNode; // physical plan of current subplan
SArray *pUpstream; // the upstream,from which to fetch the result } SSubplan;
struct SQueryDistPlanNode *pNode; // physical plan of current subquery
} SSubquery;
typedef struct SQueryJob { typedef struct SQueryDag {
SArray **pSubqueries; SArray **pSubplans;
int32_t numOfLevels; } SQueryDag;
int32_t currentLevel;
} SQueryJob;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
* @return
*/
int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode);
/**
* Create the query plan according to the bound AST, which is in the form of pQueryInfo
* @param pQueryInfo
* @param pQueryNode
* @return
*/
int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode);
/**
* Convert the query plan to string, in order to display it in the shell.
* @param pQueryNode
* @return
*/
int32_t qQueryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
/** /**
* Restore the SQL statement according to the logic query plan. * Create the physical plan for the query, according to the AST.
* @param pQueryNode
* @param sql
* @return
*/ */
int32_t qQueryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag);
/** int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str);
* Create the physical plan for the query, according to the logic plan.
* @param pQueryNode
* @param pPhyNode
* @return
*/
int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDistPlanNode *pPhyNode);
/** /**
* Convert to physical plan to string to enable to print it out in the shell. * Convert to subplan to string for the scheduler to send to the executor
* @param pPhyNode
* @param str
* @return
*/ */
int32_t qPhyPlanToString(struct SQueryDistPlanNode *pPhyNode, char** str); int32_t qSubPlanToString(struct SSubplan *pPhyNode, char** str);
/**
* Destroy the query plan object.
* @return
*/
void* qDestroyQueryPlan(struct SQueryPlanNode* pQueryNode);
/** /**
* Destroy the physical plan. * Destroy the physical plan.
* @param pQueryPhyNode * @param pQueryPhyNode
* @return * @return
*/ */
void* qDestroyQueryPhyPlan(struct SQueryDistPlanNode* pQueryPhyNode); void* qDestroyQueryDag(struct SQueryDag* pDag);
/**
* Create the query job from the physical execution plan
* @param pPhyNode
* @param pJob
* @return
*/
int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQueryJob** pJob);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,7 +20,42 @@ ...@@ -20,7 +20,42 @@
extern "C" { extern "C" {
#endif #endif
struct SQueryJob; typedef struct SQueryProfileSummary {
int64_t startTs; // Object created and added into the message queue
int64_t endTs; // the timestamp when the task is completed
int64_t cputime; // total cpu cost, not execute elapsed time
int64_t loadRemoteDataDuration; // remote io time
int64_t loadNativeDataDuration; // native disk io time
uint64_t loadNativeData; // blocks + SMA + header files
uint64_t loadRemoteData; // remote data acquired by exchange operator.
uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it
int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue.
uint64_t totalRows;
uint64_t loadRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockAgg;
uint32_t skipBlocks;
uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary;
typedef struct SQueryTask {
uint64_t queryId; // query id
uint64_t taskId; // task id
char *pSubplan; // operator tree
uint64_t status; // task status
SQueryProfileSummary summary; // task execution summary
void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage
} SQueryTask;
typedef struct SQueryJob {
SArray **pSubtasks;
// todo
} SQueryJob;
/** /**
* Process the query job, generated according to the query physical plan. * Process the query job, generated according to the query physical plan.
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "tdef.h" #include "tdef.h"
#include "tvariant.h" #include "tvariant.h"
#include "planner.h" #include "planner.h"
#include "../../planner/inc/plannerInt.h"
namespace { namespace {
void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_t colId) { void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_t colId) {
...@@ -92,10 +93,10 @@ void generateLogicplan(const char* sql) { ...@@ -92,10 +93,10 @@ void generateLogicplan(const char* sql) {
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
struct SQueryPlanNode* n = nullptr; struct SQueryPlanNode* n = nullptr;
code = qCreateQueryPlan(pQueryInfo, &n); code = createQueryPlan(pQueryInfo, &n);
char* str = NULL; char* str = NULL;
qQueryPlanToString(n, &str); queryPlanToString(n, &str);
printf("--------SQL:%s\n", sql); printf("--------SQL:%s\n", sql);
printf("%s\n", str); printf("%s\n", str);
...@@ -155,10 +156,10 @@ TEST(testCase, planner_test) { ...@@ -155,10 +156,10 @@ TEST(testCase, planner_test) {
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
struct SQueryPlanNode* n = nullptr; struct SQueryPlanNode* n = nullptr;
code = qCreateQueryPlan(pQueryInfo, &n); code = createQueryPlan(pQueryInfo, &n);
char* str = NULL; char* str = NULL;
qQueryPlanToString(n, &str); queryPlanToString(n, &str);
printf("%s\n", str); printf("%s\n", str);
destroyQueryInfo(pQueryInfo); destroyQueryInfo(pQueryInfo);
......
...@@ -25,6 +25,19 @@ extern "C" { ...@@ -25,6 +25,19 @@ extern "C" {
#include "planner.h" #include "planner.h"
#include "taosmsg.h" #include "taosmsg.h"
enum LOGIC_PLAN_E {
LP_SCAN = 1,
LP_SESSION = 2,
LP_STATE = 3,
LP_INTERVAL = 4,
LP_FILL = 5,
LP_AGG = 6,
LP_JOIN = 7,
LP_PROJECT = 8,
LP_DISTINCT = 9,
LP_ORDER = 10
};
typedef struct SQueryNodeBasicInfo { typedef struct SQueryNodeBasicInfo {
int32_t type; // operator type int32_t type; // operator type
char *name; // operator name char *name; // operator name
...@@ -57,50 +70,94 @@ typedef struct SQueryPlanNode { ...@@ -57,50 +70,94 @@ typedef struct SQueryPlanNode {
struct SQueryPlanNode *nextNode; struct SQueryPlanNode *nextNode;
} SQueryPlanNode; } SQueryPlanNode;
typedef struct SQueryDistPlanNode { typedef SSchema SSlotSchema;
typedef struct SDataBlockSchema {
int32_t index;
SSlotSchema *pSchema;
int32_t numOfCols; // number of columns
} SDataBlockSchema;
typedef struct SPhyNode {
SQueryNodeBasicInfo info; SQueryNodeBasicInfo info;
SSchema *pSchema; // the schema of the input SSDatablock SArray *pTargets; // target list to be computed or scanned at this node
int32_t numOfCols; // number of input columns SArray *pConditions; // implicitly-ANDed qual conditions
SArray *pExpr; // the query functions or sql aggregations SDataBlockSchema targetSchema;
int32_t numOfExpr; // number of result columns, which is also the number of pExprs // children plan to generated result for current node to process
void *pExtInfo; // additional information // in case of join, multiple plan nodes exist.
SArray *pChildren;
} SPhyNode;
typedef struct SScanPhyNode {
SPhyNode node;
uint64_t uid; // unique id of the table
} SScanPhyNode;
typedef SScanPhyNode STagScanPhyNode;
typedef SScanPhyNode SSystemTableScanPhyNode;
typedef struct SMultiTableScanPhyNode {
SScanPhyNode scan;
SArray *pTagsConditions; // implicitly-ANDed tag qual conditions
} SMultiTableScanPhyNode;
typedef SMultiTableScanPhyNode SMultiTableSeqScanPhyNode;
typedef struct SProjectPhyNode {
SPhyNode node;
} SProjectPhyNode;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
* @return
*/
int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode);
// previous operator to generated result for current node to process /**
// in case of join, multiple prev nodes exist. * Create the query plan according to the bound AST, which is in the form of pQueryInfo
SArray *pPrevNodes; // upstream nodes, or exchange operator to load data from multiple sources. * @param pQueryInfo
} SQueryDistPlanNode; * @param pQueryNode
* @return
typedef struct SQueryCostSummary { */
int64_t startTs; // Object created and added into the message queue int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode);
int64_t endTs; // the timestamp when the task is completed
int64_t cputime; // total cpu cost, not execute elapsed time /**
* Convert the query plan to string, in order to display it in the shell.
int64_t loadRemoteDataDuration; // remote io time * @param pQueryNode
int64_t loadNativeDataDuration; // native disk io time * @return
*/
uint64_t loadNativeData; // blocks + SMA + header files int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
uint64_t loadRemoteData; // remote data acquired by exchange operator.
/**
uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it * Restore the SQL statement according to the logic query plan.
int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue. * @param pQueryNode
* @param sql
uint64_t totalRows; * @return
uint64_t loadRows; */
uint32_t totalBlocks; int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
uint32_t loadBlocks;
uint32_t loadBlockAgg; /**
uint32_t skipBlocks; * Convert to physical plan to string to enable to print it out in the shell.
uint64_t resultSize; // generated result size in Kb. * @param pPhyNode
} SQueryCostSummary; * @param str
* @return
typedef struct SQueryTask { */
uint64_t queryId; // query id int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str);
uint64_t taskId; // task id
SQueryDistPlanNode *pNode; // operator tree /**
uint64_t status; // task status * Destroy the query plan object.
SQueryCostSummary summary; // task execution summary * @return
void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage */
} SQueryTask; void* destroyQueryPlan(struct SQueryPlanNode* pQueryNode);
/**
* Destroy the physical plan.
* @param pQueryPhyNode
* @return
*/
void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "plannerInt.h"
SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) {
return NULL;
}
SPhyNode* createPhyNode(SQueryPlanNode* node) {
switch (node->info.type) {
case LP_SCAN:
return createScanNode(node);
}
return NULL;
}
SPhyNode* createSubplan(SQueryPlanNode* pSubquery) {
return NULL;
}
int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) {
return 0;
}
...@@ -48,11 +48,11 @@ static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo); ...@@ -48,11 +48,11 @@ static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo);
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode); static void doDestroyQueryNode(SQueryPlanNode* pQueryNode);
int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len); int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len);
int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0; return 0;
} }
int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) { int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) {
SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo); SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo);
assert(taosArrayGetSize(upstream) == 1); assert(taosArrayGetSize(upstream) == 1);
...@@ -62,19 +62,20 @@ int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryP ...@@ -62,19 +62,20 @@ int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryP
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qQueryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) {
return 0; return 0;
} }
int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDistPlanNode *pPhyNode) { int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) {
return 0; return 0;
} }
int32_t qPhyPlanToString(struct SQueryDistPlanNode *pPhyNode, char** str) { int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str) {
return 0; return 0;
} }
void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) { void* destroyQueryPlan(SQueryPlanNode* pQueryNode) {
if (pQueryNode == NULL) { if (pQueryNode == NULL) {
return NULL; return NULL;
} }
...@@ -83,14 +84,10 @@ void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) { ...@@ -83,14 +84,10 @@ void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) {
return NULL; return NULL;
} }
void* qDestroyQueryPhyPlan(struct SQueryDistPlanNode* pQueryPhyNode) { void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode) {
return NULL; return NULL;
} }
int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQueryJob** pJob) {
return 0;
}
//====================================================================================================================== //======================================================================================================================
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev,
...@@ -619,7 +616,7 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev ...@@ -619,7 +616,7 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev
return len; return len;
} }
int32_t qQueryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) {
assert(pQueryNode); assert(pQueryNode);
*str = calloc(1, 4096); *str = calloc(1, 4096);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册