Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cd7a0e55
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cd7a0e55
编写于
2月 28, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-13747 New SQL model integration
上级
768e0593
变更
22
展开全部
隐藏空白更改
内联
并排
Showing
22 changed file
with
1086 addition
and
3572 deletion
+1086
-3572
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+0
-1
include/libs/parser/parser.h
include/libs/parser/parser.h
+0
-81
include/libs/planner/planner.h
include/libs/planner/planner.h
+0
-194
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+6
-8
source/libs/parser/inc/astCreateFuncs.h
source/libs/parser/inc/astCreateFuncs.h
+9
-2
source/libs/parser/inc/parserInt.h
source/libs/parser/inc/parserInt.h
+3
-4
source/libs/parser/inc/parserUtil.h
source/libs/parser/inc/parserUtil.h
+3
-4
source/libs/parser/src/astCreateContext.c
source/libs/parser/src/astCreateContext.c
+0
-29
source/libs/parser/src/astParse.c
source/libs/parser/src/astParse.c
+239
-0
source/libs/parser/src/astTranslate.c
source/libs/parser/src/astTranslate.c
+1
-235
source/libs/parser/src/parser.c
source/libs/parser/src/parser.c
+10
-2
source/libs/parser/test/parserTest.cpp
source/libs/parser/test/parserTest.cpp
+1
-1
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+20
-2
source/libs/planner/src/logicPlan.c
source/libs/planner/src/logicPlan.c
+276
-577
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+480
-0
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+0
-1227
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+15
-1
source/libs/planner/src/plannerImpl.c
source/libs/planner/src/plannerImpl.c
+0
-868
source/libs/planner/test/phyPlanTests.cpp
source/libs/planner/test/phyPlanTests.cpp
+0
-208
source/libs/planner/test/plannerTest.cpp
source/libs/planner/test/plannerTest.cpp
+3
-2
source/libs/planner/test/plannerTestMain.cpp
source/libs/planner/test/plannerTestMain.cpp
+20
-19
source/libs/planner/test/plannerTests.cpp
source/libs/planner/test/plannerTests.cpp
+0
-107
未找到文件。
include/libs/nodes/plannodes.h
浏览文件 @
cd7a0e55
...
...
@@ -22,7 +22,6 @@ extern "C" {
#include "querynodes.h"
#include "query.h"
#include "tmsg.h"
typedef
struct
SLogicNode
{
ENodeType
type
;
...
...
include/libs/parser/parser.h
浏览文件 @
cd7a0e55
...
...
@@ -20,86 +20,7 @@
extern
"C"
{
#endif
#if 0
#include "parsenodes.h"
typedef struct SParseContext {
uint64_t requestId;
int32_t acctId;
const char *db;
void *pTransporter;
SEpSet mgmtEpSet;
const char *pSql; // sql string
size_t sqlLen; // length of the sql string
char *pMsg; // extended error message if exists to help identifying the problem in sql statement.
int32_t msgLen; // max length of the msg
struct SCatalog *pCatalog;
} SParseContext;
/**
* Parse the sql statement and then return the SQueryStmtInfo as the result of bounded AST.
* @param pSql sql string
* @param length length of the sql string
* @param id operator id, generated by uuid generator
* @param msg extended error message if exists.
* @return error code
*/
int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQueryNode);
/**
* Return true if it is a ddl/dcl sql statement
* @param pQuery
* @return
*/
bool qIsDdlQuery(const SQueryNode* pQueryNode);
/**
* Destroy logic query plan
* @param pQueryNode
*/
void qDestroyQuery(SQueryNode* pQueryNode);
/**
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
* involved in the subscribe procedure.
* @param pSql
* @param length
* @param pConvertSql
* @return
*/
int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql);
void assignExprInfo(SExprInfo* dst, const SExprInfo* src);
void columnListCopy(SArray* dst, const SArray* src, uint64_t uid);
void columnListDestroy(SArray* pColumnList);
void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel);
void dropOneLevelExprInfo(SArray* pExprInfo);
typedef struct SSourceParam {
SArray *pExprNodeList; //Array<struct tExprNode*>
SArray *pColumnList; //Array<struct SColumn>
int32_t num;
} SSourceParam;
SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, const char* funcName, SSourceParam* pSource, SSchema* pResSchema, int16_t interSize);
int32_t copyExprInfoList(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
int32_t copyAllExprInfo(SArray* dst, const SArray* src, bool deepcopy);
int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo);
STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
int32_t getNewResColId();
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
SExprInfo* createBinaryExprInfo(struct tExprNode* pNode, SSchema* pResSchema);
#else
#include "querynodes.h"
#include "tmsg.h"
typedef
struct
SParseContext
{
uint64_t
requestId
;
...
...
@@ -125,8 +46,6 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
void
qDestroyQuery
(
SQuery
*
pQueryNode
);
#endif
#ifdef __cplusplus
}
#endif
...
...
include/libs/planner/planner.h
浏览文件 @
cd7a0e55
...
...
@@ -20,199 +20,7 @@
extern
"C"
{
#endif
#if 0
#include "query.h"
#include "tmsg.h"
#include "tarray.h"
#include "trpc.h"
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
#define QUERY_TYPE_MODIFY 4
enum OPERATOR_TYPE_E {
OP_Unknown,
#define INCLUDE_AS_ENUM
#include "plannerOp.h"
#undef INCLUDE_AS_ENUM
OP_TotalNum
};
enum DATASINK_TYPE_E {
DSINK_Unknown,
DSINK_Dispatch,
DSINK_Insert,
DSINK_TotalNum
};
struct SEpSet;
struct SQueryStmtInfo;
typedef SSchema SSlotSchema;
typedef struct SDataBlockSchema {
SSlotSchema *pSchema;
int32_t numOfCols; // number of columns
int32_t resultRowSize;
int16_t precision;
} SDataBlockSchema;
typedef struct SQueryNodeBasicInfo {
int32_t type; // operator type
const char *name; // operator name
} SQueryNodeBasicInfo;
typedef struct SDataSink {
SQueryNodeBasicInfo info;
SDataBlockSchema schema;
} SDataSink;
typedef struct SDataDispatcher {
SDataSink sink;
} SDataDispatcher;
typedef struct SDataInserter {
SDataSink sink;
int32_t numOfTables;
uint32_t size;
char *pData;
} SDataInserter;
typedef struct SPhyNode {
SQueryNodeBasicInfo info;
SArray *pTargets; // target list to be computed or scanned at this node
SArray *pConditions; // implicitly-ANDed qual conditions
SDataBlockSchema targetSchema;
// children plan to generated result for current node to process
// in case of join, multiple plan nodes exist.
SArray *pChildren;
struct SPhyNode *pParent;
} SPhyNode;
typedef struct SScanPhyNode {
SPhyNode node;
uint64_t uid; // unique id of the table
int8_t tableType;
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count
int32_t reverse; // reverse scan count
} SScanPhyNode;
typedef SScanPhyNode SSystemTableScanPhyNode;
typedef SScanPhyNode STagScanPhyNode;
typedef struct STableScanPhyNode {
SScanPhyNode scan;
uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow window;
SArray *pTagsConditions; // implicitly-ANDed tag qual conditions
} STableScanPhyNode;
typedef STableScanPhyNode STableSeqScanPhyNode;
typedef struct SProjectPhyNode {
SPhyNode node;
} SProjectPhyNode;
typedef struct SDownstreamSource {
SQueryNodeAddr addr;
uint64_t taskId;
uint64_t schedId;
} SDownstreamSource;
typedef struct SExchangePhyNode {
SPhyNode node;
uint64_t srcTemplateId; // template id of datasource suplans
SArray *pSrcEndPoints; // SArray<SDownstreamSource>, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhyNode;
typedef enum EAggAlgo {
AGG_ALGO_PLAIN = 1, // simple agg across all input rows
AGG_ALGO_SORTED, // grouped agg, input must be sorted
AGG_ALGO_HASHED // grouped agg, use internal hashtable
} EAggAlgo;
typedef enum EAggSplit {
AGG_SPLIT_PRE = 1, // first level agg, maybe don't need calculate the final result
AGG_SPLIT_FINAL // second level agg, must calculate the final result
} EAggSplit;
typedef struct SAggPhyNode {
SPhyNode node;
EAggAlgo aggAlgo; // algorithm used by agg operator
EAggSplit aggSplit; // distributed splitting mode
SArray *pExprs; // SExprInfo list, these are expression list of group_by_clause and parameter expression of aggregate function
SArray *pGroupByList; // SColIndex list, but these must be column node
} SAggPhyNode;
typedef struct SSubplanId {
uint64_t queryId;
uint64_t templateId;
uint64_t subplanId;
} SSubplanId;
typedef struct SSubplan {
SSubplanId id; // unique id of the subplan
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SArray *pChildren; // the datasource subplan,from which to fetch the result
SArray *pParents; // the data destination subplan, get data from current subplan
SPhyNode *pNode; // physical plan of current subplan
SDataSink *pDataSink; // data of the subplan flow into the datasink
} SSubplan;
typedef struct SQueryDag {
uint64_t queryId;
int32_t numOfSubplans;
SArray *pSubplans; // SArray*<SArray*<SSubplan*>>. The execution level of subplan, starting from 0.
} SQueryDag;
struct SQueryNode;
/**
* Create the physical plan for the query, according to the AST.
* @param pQueryInfo
* @param pDag
* @param requestId
* @return
*/
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, uint64_t requestId);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource);
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);
/**
* Convert to subplan to string for the scheduler to send to the executor
*/
int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len);
int32_t qStringToSubplan(const char* str, SSubplan** subplan);
void qDestroySubplan(SSubplan* pSubplan);
/**
* Destroy the physical plan.
* @param pQueryPhyNode
* @return
*/
void qDestroyQueryDag(SQueryDag* pDag);
char* qDagToString(const SQueryDag* pDag);
SQueryDag* qStringToDag(const char* pStr);
#else
#include "plannodes.h"
#include "query.h"
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
...
...
@@ -266,8 +74,6 @@ SQueryPlan* qStringToQueryPlan(const char* pStr);
void
qDestroyQueryPlan
(
SQueryPlan
*
pPlan
);
#endif
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientImpl.c
浏览文件 @
cd7a0e55
...
...
@@ -198,20 +198,18 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pDag
,
SArray
*
pNodeList
)
{
// pRequest->type = pQuery->type;
// SSchema* pSchema = NULL;
// int32_t numOfCols = 0;
// int32_t code = qCreateQueryDag(pQuery, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId);
// if (code != 0) {
// return code;
// }
SPlanContext
cxt
=
{
.
queryId
=
pRequest
->
requestId
,
.
pAstRoot
=
pQuery
->
pRoot
};
int32_t
code
=
qCreateQueryPlan
(
&
cxt
,
pDag
);
if
(
code
!=
0
)
{
return
code
;
}
// if (pQuery->type == TSDB_SQL_SELECT) {
// setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
// pRequest->type = TDMT_VND_QUERY;
// }
// tfree(pSchema);
// return code;
return
code
;
}
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
)
{
...
...
source/libs/parser/inc/astCreateFuncs.h
浏览文件 @
cd7a0e55
...
...
@@ -20,11 +20,18 @@
extern
"C"
{
#endif
#include "querynodes.h"
#include "nodesShowStmts.h"
#include "astCreateContext.h"
#include "parser.h"
#include "querynodes.h"
#include "ttoken.h"
typedef
struct
SAstCreateContext
{
SParseContext
*
pQueryCxt
;
bool
notSupport
;
bool
valid
;
SNode
*
pRootNode
;
}
SAstCreateContext
;
extern
SToken
nil_token
;
SNode
*
createRawExprNode
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pToken
,
SNode
*
pNode
);
...
...
source/libs/parser/inc/parserI
mpl
.h
→
source/libs/parser/inc/parserI
nt
.h
浏览文件 @
cd7a0e55
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PARSER_I
MPL
_H_
#define _TD_PARSER_I
MPL
_H_
#ifndef _TD_PARSER_I
NT
_H_
#define _TD_PARSER_I
NT
_H_
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -24,10 +24,9 @@ extern "C" {
int32_t
doParse
(
SParseContext
*
pParseCxt
,
SQuery
**
pQuery
);
int32_t
doTranslate
(
SParseContext
*
pParseCxt
,
SQuery
*
pQuery
);
int32_t
parseQuerySql
(
SParseContext
*
pCxt
,
SQuery
**
pQuery
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_PARSER_I
MPL
_H_*/
#endif
/*_TD_PARSER_I
NT
_H_*/
source/libs/parser/inc/parserUtil.h
浏览文件 @
cd7a0e55
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_PARSERUTIL_H
#define TDENGINE_PARSERUTIL_H
#ifndef TDENGINE_PARSER
_
UTIL_H
#define TDENGINE_PARSER
_
UTIL_H
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -22,7 +22,6 @@ extern "C" {
#include "os.h"
#include "query.h"
#include "tmsg.h"
#include "ttoken.h"
typedef
struct
SMsgBuf
{
...
...
@@ -54,4 +53,4 @@ STableComInfo getTableInfo(const STableMeta* pTableMeta);
}
#endif
#endif // TDENGINE_PARSERUTIL_H
#endif // TDENGINE_PARSER
_
UTIL_H
source/libs/parser/src/astCreateContext.c
已删除
100644 → 0
浏览文件 @
768e0593
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ttoken.h"
#include "astCreateContext.h"
int32_t
createAstCreateContext
(
SParseContext
*
pQueryCxt
,
SAstCreateContext
*
pCxt
)
{
pCxt
->
pQueryCxt
=
pQueryCxt
;
pCxt
->
notSupport
=
false
;
pCxt
->
valid
=
true
;
pCxt
->
pRootNode
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
destroyAstCreateContext
(
SAstCreateContext
*
pCxt
)
{
return
TSDB_CODE_SUCCESS
;
}
source/libs/parser/src/astParse.c
0 → 100644
浏览文件 @
cd7a0e55
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "parserInt.h"
#include "astCreateFuncs.h"
#include "ttoken.h"
typedef
void
*
(
*
FMalloc
)(
size_t
);
typedef
void
(
*
FFree
)(
void
*
);
extern
void
*
NewParseAlloc
(
FMalloc
);
extern
void
NewParse
(
void
*
,
int
,
SToken
,
void
*
);
extern
void
NewParseFree
(
void
*
,
FFree
);
extern
void
NewParseTrace
(
FILE
*
,
char
*
);
static
uint32_t
toNewTokenId
(
uint32_t
tokenId
)
{
switch
(
tokenId
)
{
case
TK_OR
:
return
NEW_TK_OR
;
case
TK_AND
:
return
NEW_TK_AND
;
case
TK_UNION
:
return
NEW_TK_UNION
;
case
TK_ALL
:
return
NEW_TK_ALL
;
case
TK_MINUS
:
return
NEW_TK_NK_MINUS
;
case
TK_PLUS
:
return
NEW_TK_NK_PLUS
;
case
TK_STAR
:
return
NEW_TK_NK_STAR
;
case
TK_SLASH
:
return
NEW_TK_NK_SLASH
;
case
TK_REM
:
return
NEW_TK_NK_REM
;
case
TK_SHOW
:
return
NEW_TK_SHOW
;
case
TK_DATABASES
:
return
NEW_TK_DATABASES
;
case
TK_INTEGER
:
return
NEW_TK_NK_INTEGER
;
case
TK_FLOAT
:
return
NEW_TK_NK_FLOAT
;
case
TK_STRING
:
return
NEW_TK_NK_STRING
;
case
TK_BOOL
:
return
NEW_TK_NK_BOOL
;
case
TK_TIMESTAMP
:
return
NEW_TK_TIMESTAMP
;
case
TK_VARIABLE
:
return
NEW_TK_NK_VARIABLE
;
case
TK_COMMA
:
return
NEW_TK_NK_COMMA
;
case
TK_ID
:
return
NEW_TK_NK_ID
;
case
TK_LP
:
return
NEW_TK_NK_LP
;
case
TK_RP
:
return
NEW_TK_NK_RP
;
case
TK_DOT
:
return
NEW_TK_NK_DOT
;
case
TK_BETWEEN
:
return
NEW_TK_BETWEEN
;
case
TK_NOT
:
return
NEW_TK_NOT
;
case
TK_IS
:
return
NEW_TK_IS
;
case
TK_NULL
:
return
NEW_TK_NULL
;
case
TK_LT
:
return
NEW_TK_NK_LT
;
case
TK_GT
:
return
NEW_TK_NK_GT
;
case
TK_LE
:
return
NEW_TK_NK_LE
;
case
TK_GE
:
return
NEW_TK_NK_GE
;
case
TK_NE
:
return
NEW_TK_NK_NE
;
case
TK_EQ
:
return
NEW_TK_NK_EQ
;
case
TK_LIKE
:
return
NEW_TK_LIKE
;
case
TK_MATCH
:
return
NEW_TK_MATCH
;
case
TK_NMATCH
:
return
NEW_TK_NMATCH
;
case
TK_IN
:
return
NEW_TK_IN
;
case
TK_SELECT
:
return
NEW_TK_SELECT
;
case
TK_DISTINCT
:
return
NEW_TK_DISTINCT
;
case
TK_WHERE
:
return
NEW_TK_WHERE
;
case
TK_AS
:
return
NEW_TK_AS
;
case
TK_FROM
:
return
NEW_TK_FROM
;
case
TK_JOIN
:
return
NEW_TK_JOIN
;
// case TK_PARTITION:
// return NEW_TK_PARTITION;
case
TK_SESSION
:
return
NEW_TK_SESSION
;
case
TK_STATE_WINDOW
:
return
NEW_TK_STATE_WINDOW
;
case
TK_INTERVAL
:
return
NEW_TK_INTERVAL
;
case
TK_SLIDING
:
return
NEW_TK_SLIDING
;
case
TK_FILL
:
return
NEW_TK_FILL
;
// case TK_VALUE:
// return NEW_TK_VALUE;
case
TK_NONE
:
return
NEW_TK_NONE
;
case
TK_PREV
:
return
NEW_TK_PREV
;
case
TK_LINEAR
:
return
NEW_TK_LINEAR
;
// case TK_NEXT:
// return NEW_TK_NEXT;
case
TK_GROUP
:
return
NEW_TK_GROUP
;
case
TK_HAVING
:
return
NEW_TK_HAVING
;
case
TK_ORDER
:
return
NEW_TK_ORDER
;
case
TK_BY
:
return
NEW_TK_BY
;
case
TK_ASC
:
return
NEW_TK_ASC
;
case
TK_DESC
:
return
NEW_TK_DESC
;
case
TK_SLIMIT
:
return
NEW_TK_SLIMIT
;
case
TK_SOFFSET
:
return
NEW_TK_SOFFSET
;
case
TK_LIMIT
:
return
NEW_TK_LIMIT
;
case
TK_OFFSET
:
return
NEW_TK_OFFSET
;
case
TK_SPACE
:
case
NEW_TK_ON
:
case
NEW_TK_INNER
:
break
;
default:
printf
(
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!tokenId = %d
\n
"
,
tokenId
);
}
return
tokenId
;
}
static
uint32_t
getToken
(
const
char
*
z
,
uint32_t
*
tokenId
)
{
uint32_t
n
=
tGetToken
(
z
,
tokenId
);
*
tokenId
=
toNewTokenId
(
*
tokenId
);
return
n
;
}
static
bool
isCmd
(
const
SNode
*
pRootNode
)
{
if
(
NULL
==
pRootNode
)
{
return
true
;
}
switch
(
nodeType
(
pRootNode
))
{
case
QUERY_NODE_SELECT_STMT
:
return
false
;
default:
break
;
}
return
true
;
}
int32_t
doParse
(
SParseContext
*
pParseCxt
,
SQuery
**
pQuery
)
{
SAstCreateContext
cxt
=
{
.
pQueryCxt
=
pParseCxt
,
.
notSupport
=
false
,
.
valid
=
true
,
.
pRootNode
=
NULL
};
void
*
pParser
=
NewParseAlloc
(
malloc
);
int32_t
i
=
0
;
while
(
1
)
{
SToken
t0
=
{
0
};
if
(
cxt
.
pQueryCxt
->
pSql
[
i
]
==
0
)
{
NewParse
(
pParser
,
0
,
t0
,
&
cxt
);
goto
abort_parse
;
}
t0
.
n
=
getToken
((
char
*
)
&
cxt
.
pQueryCxt
->
pSql
[
i
],
&
t0
.
type
);
t0
.
z
=
(
char
*
)(
cxt
.
pQueryCxt
->
pSql
+
i
);
i
+=
t0
.
n
;
switch
(
t0
.
type
)
{
case
TK_SPACE
:
case
TK_COMMENT
:
{
break
;
}
case
TK_SEMI
:
{
NewParse
(
pParser
,
0
,
t0
,
&
cxt
);
goto
abort_parse
;
}
case
TK_QUESTION
:
case
TK_ILLEGAL
:
{
snprintf
(
cxt
.
pQueryCxt
->
pMsg
,
cxt
.
pQueryCxt
->
msgLen
,
"unrecognized token:
\"
%s
\"
"
,
t0
.
z
);
cxt
.
valid
=
false
;
goto
abort_parse
;
}
case
TK_HEX
:
case
TK_OCT
:
case
TK_BIN
:
{
snprintf
(
cxt
.
pQueryCxt
->
pMsg
,
cxt
.
pQueryCxt
->
msgLen
,
"unsupported token:
\"
%s
\"
"
,
t0
.
z
);
cxt
.
valid
=
false
;
goto
abort_parse
;
}
default:
NewParse
(
pParser
,
t0
.
type
,
t0
,
&
cxt
);
// NewParseTrace(stdout, "");
if
(
!
cxt
.
valid
)
{
goto
abort_parse
;
}
}
}
abort_parse:
NewParseFree
(
pParser
,
free
);
if
(
cxt
.
valid
)
{
*
pQuery
=
calloc
(
1
,
sizeof
(
SQuery
));
(
*
pQuery
)
->
isCmd
=
isCmd
(
cxt
.
pRootNode
);
(
*
pQuery
)
->
pRoot
=
cxt
.
pRootNode
;
}
return
cxt
.
valid
?
TSDB_CODE_SUCCESS
:
TSDB_CODE_FAILED
;
}
source/libs/parser/src/
parserImpl
.c
→
source/libs/parser/src/
astTranslate
.c
浏览文件 @
cd7a0e55
...
...
@@ -13,238 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "parserI
mpl
.h"
#include "parserI
nt
.h"
#include "astCreateContext.h"
#include "catalog.h"
#include "functionMgt.h"
#include "parserUtil.h"
#include "tglobal.h"
#include "tname.h"
#include "ttime.h"
#include "ttoken.h"
typedef
void
*
(
*
FMalloc
)(
size_t
);
typedef
void
(
*
FFree
)(
void
*
);
extern
void
*
NewParseAlloc
(
FMalloc
);
extern
void
NewParse
(
void
*
,
int
,
SToken
,
void
*
);
extern
void
NewParseFree
(
void
*
,
FFree
);
extern
void
NewParseTrace
(
FILE
*
,
char
*
);
static
uint32_t
toNewTokenId
(
uint32_t
tokenId
)
{
switch
(
tokenId
)
{
case
TK_OR
:
return
NEW_TK_OR
;
case
TK_AND
:
return
NEW_TK_AND
;
case
TK_UNION
:
return
NEW_TK_UNION
;
case
TK_ALL
:
return
NEW_TK_ALL
;
case
TK_MINUS
:
return
NEW_TK_NK_MINUS
;
case
TK_PLUS
:
return
NEW_TK_NK_PLUS
;
case
TK_STAR
:
return
NEW_TK_NK_STAR
;
case
TK_SLASH
:
return
NEW_TK_NK_SLASH
;
case
TK_REM
:
return
NEW_TK_NK_REM
;
case
TK_SHOW
:
return
NEW_TK_SHOW
;
case
TK_DATABASES
:
return
NEW_TK_DATABASES
;
case
TK_INTEGER
:
return
NEW_TK_NK_INTEGER
;
case
TK_FLOAT
:
return
NEW_TK_NK_FLOAT
;
case
TK_STRING
:
return
NEW_TK_NK_STRING
;
case
TK_BOOL
:
return
NEW_TK_NK_BOOL
;
case
TK_TIMESTAMP
:
return
NEW_TK_TIMESTAMP
;
case
TK_VARIABLE
:
return
NEW_TK_NK_VARIABLE
;
case
TK_COMMA
:
return
NEW_TK_NK_COMMA
;
case
TK_ID
:
return
NEW_TK_NK_ID
;
case
TK_LP
:
return
NEW_TK_NK_LP
;
case
TK_RP
:
return
NEW_TK_NK_RP
;
case
TK_DOT
:
return
NEW_TK_NK_DOT
;
case
TK_BETWEEN
:
return
NEW_TK_BETWEEN
;
case
TK_NOT
:
return
NEW_TK_NOT
;
case
TK_IS
:
return
NEW_TK_IS
;
case
TK_NULL
:
return
NEW_TK_NULL
;
case
TK_LT
:
return
NEW_TK_NK_LT
;
case
TK_GT
:
return
NEW_TK_NK_GT
;
case
TK_LE
:
return
NEW_TK_NK_LE
;
case
TK_GE
:
return
NEW_TK_NK_GE
;
case
TK_NE
:
return
NEW_TK_NK_NE
;
case
TK_EQ
:
return
NEW_TK_NK_EQ
;
case
TK_LIKE
:
return
NEW_TK_LIKE
;
case
TK_MATCH
:
return
NEW_TK_MATCH
;
case
TK_NMATCH
:
return
NEW_TK_NMATCH
;
case
TK_IN
:
return
NEW_TK_IN
;
case
TK_SELECT
:
return
NEW_TK_SELECT
;
case
TK_DISTINCT
:
return
NEW_TK_DISTINCT
;
case
TK_WHERE
:
return
NEW_TK_WHERE
;
case
TK_AS
:
return
NEW_TK_AS
;
case
TK_FROM
:
return
NEW_TK_FROM
;
case
TK_JOIN
:
return
NEW_TK_JOIN
;
// case TK_PARTITION:
// return NEW_TK_PARTITION;
case
TK_SESSION
:
return
NEW_TK_SESSION
;
case
TK_STATE_WINDOW
:
return
NEW_TK_STATE_WINDOW
;
case
TK_INTERVAL
:
return
NEW_TK_INTERVAL
;
case
TK_SLIDING
:
return
NEW_TK_SLIDING
;
case
TK_FILL
:
return
NEW_TK_FILL
;
// case TK_VALUE:
// return NEW_TK_VALUE;
case
TK_NONE
:
return
NEW_TK_NONE
;
case
TK_PREV
:
return
NEW_TK_PREV
;
case
TK_LINEAR
:
return
NEW_TK_LINEAR
;
// case TK_NEXT:
// return NEW_TK_NEXT;
case
TK_GROUP
:
return
NEW_TK_GROUP
;
case
TK_HAVING
:
return
NEW_TK_HAVING
;
case
TK_ORDER
:
return
NEW_TK_ORDER
;
case
TK_BY
:
return
NEW_TK_BY
;
case
TK_ASC
:
return
NEW_TK_ASC
;
case
TK_DESC
:
return
NEW_TK_DESC
;
case
TK_SLIMIT
:
return
NEW_TK_SLIMIT
;
case
TK_SOFFSET
:
return
NEW_TK_SOFFSET
;
case
TK_LIMIT
:
return
NEW_TK_LIMIT
;
case
TK_OFFSET
:
return
NEW_TK_OFFSET
;
case
TK_SPACE
:
case
NEW_TK_ON
:
case
NEW_TK_INNER
:
break
;
default:
printf
(
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!tokenId = %d
\n
"
,
tokenId
);
}
return
tokenId
;
}
static
uint32_t
getToken
(
const
char
*
z
,
uint32_t
*
tokenId
)
{
uint32_t
n
=
tGetToken
(
z
,
tokenId
);
*
tokenId
=
toNewTokenId
(
*
tokenId
);
return
n
;
}
static
bool
isCmd
(
const
SNode
*
pRootNode
)
{
if
(
NULL
==
pRootNode
)
{
return
true
;
}
switch
(
nodeType
(
pRootNode
))
{
case
QUERY_NODE_SELECT_STMT
:
return
false
;
default:
break
;
}
return
true
;
}
int32_t
doParse
(
SParseContext
*
pParseCxt
,
SQuery
**
pQuery
)
{
SAstCreateContext
cxt
;
createAstCreateContext
(
pParseCxt
,
&
cxt
);
void
*
pParser
=
NewParseAlloc
(
malloc
);
int32_t
i
=
0
;
while
(
1
)
{
SToken
t0
=
{
0
};
if
(
cxt
.
pQueryCxt
->
pSql
[
i
]
==
0
)
{
NewParse
(
pParser
,
0
,
t0
,
&
cxt
);
goto
abort_parse
;
}
t0
.
n
=
getToken
((
char
*
)
&
cxt
.
pQueryCxt
->
pSql
[
i
],
&
t0
.
type
);
t0
.
z
=
(
char
*
)(
cxt
.
pQueryCxt
->
pSql
+
i
);
i
+=
t0
.
n
;
switch
(
t0
.
type
)
{
case
TK_SPACE
:
case
TK_COMMENT
:
{
break
;
}
case
TK_SEMI
:
{
NewParse
(
pParser
,
0
,
t0
,
&
cxt
);
goto
abort_parse
;
}
case
TK_QUESTION
:
case
TK_ILLEGAL
:
{
snprintf
(
cxt
.
pQueryCxt
->
pMsg
,
cxt
.
pQueryCxt
->
msgLen
,
"unrecognized token:
\"
%s
\"
"
,
t0
.
z
);
cxt
.
valid
=
false
;
goto
abort_parse
;
}
case
TK_HEX
:
case
TK_OCT
:
case
TK_BIN
:
{
snprintf
(
cxt
.
pQueryCxt
->
pMsg
,
cxt
.
pQueryCxt
->
msgLen
,
"unsupported token:
\"
%s
\"
"
,
t0
.
z
);
cxt
.
valid
=
false
;
goto
abort_parse
;
}
default:
NewParse
(
pParser
,
t0
.
type
,
t0
,
&
cxt
);
// NewParseTrace(stdout, "");
if
(
!
cxt
.
valid
)
{
goto
abort_parse
;
}
}
}
abort_parse:
NewParseFree
(
pParser
,
free
);
destroyAstCreateContext
(
&
cxt
);
if
(
cxt
.
valid
)
{
*
pQuery
=
calloc
(
1
,
sizeof
(
SQuery
));
(
*
pQuery
)
->
isCmd
=
isCmd
(
cxt
.
pRootNode
);
(
*
pQuery
)
->
pRoot
=
cxt
.
pRootNode
;
}
return
cxt
.
valid
?
TSDB_CODE_SUCCESS
:
TSDB_CODE_FAILED
;
}
static
bool
afterGroupBy
(
ESqlClause
clause
)
{
return
clause
>
SQL_CLAUSE_GROUP_BY
;
...
...
@@ -1064,11 +838,3 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
}
return
code
;
}
int32_t
parseQuerySql
(
SParseContext
*
pCxt
,
SQuery
**
pQuery
)
{
int32_t
code
=
doParse
(
pCxt
,
pQuery
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
doTranslate
(
pCxt
,
*
pQuery
);
}
return
code
;
}
source/libs/parser/src/parser.c
浏览文件 @
cd7a0e55
...
...
@@ -267,7 +267,7 @@ void qDestroyQuery(SQueryNode* pQueryNode) {
#include "parser.h"
#include "insertParser.h"
#include "parserI
mpl
.h"
#include "parserI
nt
.h"
#include "ttoken.h"
static
bool
isInsertSql
(
const
char
*
pStr
,
size_t
length
)
{
...
...
@@ -281,11 +281,19 @@ static bool isInsertSql(const char* pStr, size_t length) {
}
while
(
1
);
}
static
int32_t
parseSqlIntoAst
(
SParseContext
*
pCxt
,
SQuery
**
pQuery
)
{
int32_t
code
=
doParse
(
pCxt
,
pQuery
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
doTranslate
(
pCxt
,
*
pQuery
);
}
return
code
;
}
int32_t
qParseQuerySql
(
SParseContext
*
pCxt
,
SQuery
**
pQuery
)
{
if
(
isInsertSql
(
pCxt
->
pSql
,
pCxt
->
sqlLen
))
{
return
parseInsertSql
(
pCxt
,
pQuery
);
}
else
{
return
parse
QuerySql
(
pCxt
,
pQuery
);
return
parse
SqlIntoAst
(
pCxt
,
pQuery
);
}
}
...
...
source/libs/parser/test/
newP
arserTest.cpp
→
source/libs/parser/test/
p
arserTest.cpp
浏览文件 @
cd7a0e55
...
...
@@ -18,7 +18,7 @@
#include <gtest/gtest.h>
#include "parserI
mpl
.h"
#include "parserI
nt
.h"
using
namespace
std
;
using
namespace
testing
;
...
...
source/libs/planner/inc/plannerI
mpl
.h
→
source/libs/planner/inc/plannerI
nt
.h
浏览文件 @
cd7a0e55
...
...
@@ -20,11 +20,29 @@
extern
"C"
{
#endif
#include "plannodes.h"
#include "planner.h"
int32_t
createLogicPlan
(
SNode
*
pNode
,
SLogicNode
**
pLogicNode
);
#define CHECK_ALLOC(p, res) \
do { \
if (NULL == (p)) { \
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
return (res); \
} \
} while (0)
#define CHECK_CODE(exec, res) \
do { \
int32_t code = (exec); \
if (TSDB_CODE_SUCCESS != code) { \
pCxt->errCode = code; \
return (res); \
} \
} while (0)
int32_t
createLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
**
pLogicNode
);
int32_t
optimize
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
);
int32_t
createPhysiPlan
(
SLogicNode
*
pLogicNode
,
SPhysiNode
**
pPhyNode
);
int32_t
buildPhysiPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SQueryPlan
**
pPlan
);
#ifdef __cplusplus
}
...
...
source/libs/planner/src/logicPlan.c
浏览文件 @
cd7a0e55
此差异已折叠。
点击以展开。
source/libs/planner/src/physicalPlan.c
浏览文件 @
cd7a0e55
...
...
@@ -492,3 +492,483 @@ void qDestroySubplan(SSubplan* pSubplan) {
}
#endif
#include "plannerInt.h"
#include "functionMgt.h"
typedef
struct
SSubLogicPlan
{
SNode
*
pRoot
;
// SLogicNode
bool
haveSuperTable
;
bool
haveSystemTable
;
}
SSubLogicPlan
;
int32_t
splitLogicPlan
(
SSubLogicPlan
*
pLogicPlan
)
{
// todo
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SSlotIndex
{
int16_t
dataBlockId
;
int16_t
slotId
;
}
SSlotIndex
;
typedef
struct
SPhysiPlanContext
{
int32_t
errCode
;
int16_t
nextDataBlockId
;
SArray
*
pLocationHelper
;
}
SPhysiPlanContext
;
static
int32_t
getSlotKey
(
SNode
*
pNode
,
char
*
pKey
)
{
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
if
(
'\0'
==
pCol
->
tableAlias
[
0
])
{
return
sprintf
(
pKey
,
"%s"
,
pCol
->
colName
);
}
return
sprintf
(
pKey
,
"%s.%s"
,
pCol
->
tableAlias
,
pCol
->
colName
);
}
return
sprintf
(
pKey
,
"%s"
,
((
SExprNode
*
)
pNode
)
->
aliasName
);
}
static
SNode
*
createSlotDesc
(
SPhysiPlanContext
*
pCxt
,
const
SNode
*
pNode
,
int16_t
slotId
)
{
SSlotDescNode
*
pSlot
=
(
SSlotDescNode
*
)
nodesMakeNode
(
QUERY_NODE_SLOT_DESC
);
CHECK_ALLOC
(
pSlot
,
NULL
);
pSlot
->
slotId
=
slotId
;
pSlot
->
dataType
=
((
SExprNode
*
)
pNode
)
->
resType
;
pSlot
->
reserve
=
false
;
pSlot
->
output
=
false
;
return
(
SNode
*
)
pSlot
;
}
static
SNode
*
createTarget
(
SNode
*
pNode
,
int16_t
dataBlockId
,
int16_t
slotId
)
{
STargetNode
*
pTarget
=
(
STargetNode
*
)
nodesMakeNode
(
QUERY_NODE_TARGET
);
if
(
NULL
==
pTarget
)
{
return
NULL
;
}
pTarget
->
dataBlockId
=
dataBlockId
;
pTarget
->
slotId
=
slotId
;
pTarget
->
pExpr
=
pNode
;
return
(
SNode
*
)
pTarget
;
}
static
int32_t
addDataBlockDesc
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pList
,
SDataBlockDescNode
*
pDataBlockDesc
)
{
SHashObj
*
pHash
=
NULL
;
if
(
NULL
==
pDataBlockDesc
->
pSlots
)
{
pDataBlockDesc
->
pSlots
=
nodesMakeList
();
CHECK_ALLOC
(
pDataBlockDesc
->
pSlots
,
TSDB_CODE_OUT_OF_MEMORY
);
pHash
=
taosHashInit
(
LIST_LENGTH
(
pList
),
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
CHECK_ALLOC
(
pHash
,
TSDB_CODE_OUT_OF_MEMORY
);
if
(
NULL
==
taosArrayInsert
(
pCxt
->
pLocationHelper
,
pDataBlockDesc
->
dataBlockId
,
&
pHash
))
{
taosHashCleanup
(
pHash
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
else
{
pHash
=
taosArrayGetP
(
pCxt
->
pLocationHelper
,
pDataBlockDesc
->
dataBlockId
);
}
SNode
*
pNode
=
NULL
;
int16_t
slotId
=
taosHashGetSize
(
pHash
);
FOREACH
(
pNode
,
pList
)
{
SNode
*
pSlot
=
createSlotDesc
(
pCxt
,
pNode
,
slotId
);
CHECK_ALLOC
(
pSlot
,
TSDB_CODE_OUT_OF_MEMORY
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
pDataBlockDesc
->
pSlots
,
(
SNode
*
)
pSlot
))
{
nodesDestroyNode
(
pSlot
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SSlotIndex
index
=
{
.
dataBlockId
=
pDataBlockDesc
->
dataBlockId
,
.
slotId
=
slotId
};
char
name
[
TSDB_TABLE_NAME_LEN
+
TSDB_COL_NAME_LEN
];
int32_t
len
=
getSlotKey
(
pNode
,
name
);
CHECK_CODE
(
taosHashPut
(
pHash
,
name
,
len
,
&
index
,
sizeof
(
SSlotIndex
)),
TSDB_CODE_OUT_OF_MEMORY
);
SNode
*
pTarget
=
createTarget
(
pNode
,
pDataBlockDesc
->
dataBlockId
,
slotId
);
CHECK_ALLOC
(
pTarget
,
TSDB_CODE_OUT_OF_MEMORY
);
REPLACE_NODE
(
pTarget
);
++
slotId
;
}
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SSetSlotIdCxt
{
int32_t
errCode
;
SHashObj
*
pLeftHash
;
SHashObj
*
pRightHash
;
}
SSetSlotIdCxt
;
static
EDealRes
doSetSlotId
(
SNode
*
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
)
&&
0
!=
strcmp
(((
SColumnNode
*
)
pNode
)
->
colName
,
"*"
))
{
SSetSlotIdCxt
*
pCxt
=
(
SSetSlotIdCxt
*
)
pContext
;
char
name
[
TSDB_TABLE_NAME_LEN
+
TSDB_COL_NAME_LEN
];
int32_t
len
=
getSlotKey
(
pNode
,
name
);
SSlotIndex
*
pIndex
=
taosHashGet
(
pCxt
->
pLeftHash
,
name
,
len
);
if
(
NULL
==
pIndex
)
{
pIndex
=
taosHashGet
(
pCxt
->
pRightHash
,
name
,
len
);
}
// pIndex is definitely not NULL, otherwise it is a bug
((
SColumnNode
*
)
pNode
)
->
dataBlockId
=
pIndex
->
dataBlockId
;
((
SColumnNode
*
)
pNode
)
->
slotId
=
pIndex
->
slotId
;
CHECK_ALLOC
(
pNode
,
DEAL_RES_ERROR
);
return
DEAL_RES_IGNORE_CHILD
;
}
return
DEAL_RES_CONTINUE
;
}
static
SNode
*
setNodeSlotId
(
SPhysiPlanContext
*
pCxt
,
int16_t
leftDataBlockId
,
int16_t
rightDataBlockId
,
SNode
*
pNode
)
{
SNode
*
pRes
=
nodesCloneNode
(
pNode
);
CHECK_ALLOC
(
pRes
,
NULL
);
SSetSlotIdCxt
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
pLeftHash
=
taosArrayGetP
(
pCxt
->
pLocationHelper
,
leftDataBlockId
),
.
pRightHash
=
(
rightDataBlockId
<
0
?
NULL
:
taosArrayGetP
(
pCxt
->
pLocationHelper
,
rightDataBlockId
))
};
nodesWalkNode
(
pRes
,
doSetSlotId
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesDestroyNode
(
pRes
);
return
NULL
;
}
return
pRes
;
}
static
SNodeList
*
setListSlotId
(
SPhysiPlanContext
*
pCxt
,
int16_t
leftDataBlockId
,
int16_t
rightDataBlockId
,
SNodeList
*
pList
)
{
SNodeList
*
pRes
=
nodesCloneList
(
pList
);
CHECK_ALLOC
(
pRes
,
NULL
);
SSetSlotIdCxt
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
pLeftHash
=
taosArrayGetP
(
pCxt
->
pLocationHelper
,
leftDataBlockId
),
.
pRightHash
=
(
rightDataBlockId
<
0
?
NULL
:
taosArrayGetP
(
pCxt
->
pLocationHelper
,
rightDataBlockId
))
};
nodesWalkList
(
pRes
,
doSetSlotId
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesDestroyList
(
pRes
);
return
NULL
;
}
return
pRes
;
}
static
SPhysiNode
*
makePhysiNode
(
SPhysiPlanContext
*
pCxt
,
ENodeType
type
)
{
SPhysiNode
*
pPhysiNode
=
(
SPhysiNode
*
)
nodesMakeNode
(
type
);
if
(
NULL
==
pPhysiNode
)
{
return
NULL
;
}
pPhysiNode
->
outputDataBlockDesc
.
dataBlockId
=
pCxt
->
nextDataBlockId
++
;
pPhysiNode
->
outputDataBlockDesc
.
type
=
QUERY_NODE_DATABLOCK_DESC
;
return
pPhysiNode
;
}
static
int32_t
setConditionsSlotId
(
SPhysiPlanContext
*
pCxt
,
const
SLogicNode
*
pLogicNode
,
SPhysiNode
*
pPhysiNode
)
{
if
(
NULL
!=
pLogicNode
->
pConditions
)
{
pPhysiNode
->
pConditions
=
setNodeSlotId
(
pCxt
,
pPhysiNode
->
outputDataBlockDesc
.
dataBlockId
,
-
1
,
pLogicNode
->
pConditions
);
CHECK_ALLOC
(
pPhysiNode
->
pConditions
,
TSDB_CODE_OUT_OF_MEMORY
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
setSlotOutput
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pTargets
,
SDataBlockDescNode
*
pDataBlockDesc
)
{
SHashObj
*
pHash
=
taosArrayGetP
(
pCxt
->
pLocationHelper
,
pDataBlockDesc
->
dataBlockId
);
char
name
[
TSDB_TABLE_NAME_LEN
+
TSDB_COL_NAME_LEN
];
SNode
*
pNode
;
FOREACH
(
pNode
,
pTargets
)
{
int32_t
len
=
getSlotKey
(
pNode
,
name
);
SSlotIndex
*
pIndex
=
taosHashGet
(
pHash
,
name
,
len
);
((
SSlotDescNode
*
)
nodesListGetNode
(
pDataBlockDesc
->
pSlots
,
pIndex
->
slotId
))
->
output
=
true
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
initScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SScanLogicNode
*
pScanLogicNode
,
SScanPhysiNode
*
pScanPhysiNode
)
{
if
(
NULL
!=
pScanLogicNode
->
pScanCols
)
{
pScanPhysiNode
->
pScanCols
=
nodesCloneList
(
pScanLogicNode
->
pScanCols
);
CHECK_ALLOC
(
pScanPhysiNode
->
pScanCols
,
TSDB_CODE_OUT_OF_MEMORY
);
}
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
CHECK_CODE
(
addDataBlockDesc
(
pCxt
,
pScanPhysiNode
->
pScanCols
,
&
pScanPhysiNode
->
node
.
outputDataBlockDesc
),
TSDB_CODE_OUT_OF_MEMORY
);
CHECK_CODE
(
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pScanLogicNode
,
(
SPhysiNode
*
)
pScanPhysiNode
),
TSDB_CODE_OUT_OF_MEMORY
);
CHECK_CODE
(
setSlotOutput
(
pCxt
,
pScanLogicNode
->
node
.
pTargets
,
&
pScanPhysiNode
->
node
.
outputDataBlockDesc
),
TSDB_CODE_OUT_OF_MEMORY
);
pScanPhysiNode
->
uid
=
pScanLogicNode
->
pMeta
->
uid
;
pScanPhysiNode
->
tableType
=
pScanLogicNode
->
pMeta
->
tableType
;
pScanPhysiNode
->
order
=
TSDB_ORDER_ASC
;
pScanPhysiNode
->
count
=
1
;
pScanPhysiNode
->
reverse
=
0
;
return
TSDB_CODE_SUCCESS
;
}
static
SPhysiNode
*
createTagScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SScanLogicNode
*
pScanLogicNode
)
{
STagScanPhysiNode
*
pTagScan
=
(
STagScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
);
CHECK_ALLOC
(
pTagScan
,
NULL
);
CHECK_CODE
(
initScanPhysiNode
(
pCxt
,
pScanLogicNode
,
(
SScanPhysiNode
*
)
pTagScan
),
(
SPhysiNode
*
)
pTagScan
);
return
(
SPhysiNode
*
)
pTagScan
;
}
static
SPhysiNode
*
createTableScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SScanLogicNode
*
pScanLogicNode
)
{
STableScanPhysiNode
*
pTableScan
=
(
STableScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
);
CHECK_ALLOC
(
pTableScan
,
NULL
);
CHECK_CODE
(
initScanPhysiNode
(
pCxt
,
pScanLogicNode
,
(
SScanPhysiNode
*
)
pTableScan
),
(
SPhysiNode
*
)
pTableScan
);
pTableScan
->
scanFlag
=
pScanLogicNode
->
scanFlag
;
pTableScan
->
scanRange
=
pScanLogicNode
->
scanRange
;
return
(
SPhysiNode
*
)
pTableScan
;
}
static
SPhysiNode
*
createScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SScanLogicNode
*
pScanLogicNode
)
{
switch
(
pScanLogicNode
->
scanType
)
{
case
SCAN_TYPE_TAG
:
return
createTagScanPhysiNode
(
pCxt
,
pScanLogicNode
);
case
SCAN_TYPE_TABLE
:
return
createTableScanPhysiNode
(
pCxt
,
pScanLogicNode
);
case
SCAN_TYPE_STABLE
:
case
SCAN_TYPE_STREAM
:
break
;
default:
break
;
}
return
NULL
;
}
static
SNodeList
*
createJoinOutputCols
(
SPhysiPlanContext
*
pCxt
,
SDataBlockDescNode
*
pLeftDesc
,
SDataBlockDescNode
*
pRightDesc
)
{
SNodeList
*
pCols
=
nodesMakeList
();
CHECK_ALLOC
(
pCols
,
NULL
);
SNode
*
pNode
;
FOREACH
(
pNode
,
pLeftDesc
->
pSlots
)
{
SSlotDescNode
*
pSlot
=
(
SSlotDescNode
*
)
pNode
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
goto
error
;
}
pCol
->
node
.
resType
=
pSlot
->
dataType
;
pCol
->
dataBlockId
=
pLeftDesc
->
dataBlockId
;
pCol
->
slotId
=
pSlot
->
slotId
;
pCol
->
colId
=
-
1
;
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
pCols
,
(
SNode
*
)
pCol
))
{
goto
error
;
}
}
FOREACH
(
pNode
,
pRightDesc
->
pSlots
)
{
SSlotDescNode
*
pSlot
=
(
SSlotDescNode
*
)
pNode
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
goto
error
;
}
pCol
->
node
.
resType
=
pSlot
->
dataType
;
pCol
->
dataBlockId
=
pRightDesc
->
dataBlockId
;
pCol
->
slotId
=
pSlot
->
slotId
;
pCol
->
colId
=
-
1
;
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
pCols
,
(
SNode
*
)
pCol
))
{
goto
error
;
}
}
return
pCols
;
error:
nodesDestroyList
(
pCols
);
return
NULL
;
}
static
SPhysiNode
*
createJoinPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SJoinLogicNode
*
pJoinLogicNode
)
{
SJoinPhysiNode
*
pJoin
=
(
SJoinPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_JOIN
);
CHECK_ALLOC
(
pJoin
,
NULL
);
SDataBlockDescNode
*
pLeftDesc
=
&
((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
outputDataBlockDesc
;
SDataBlockDescNode
*
pRightDesc
=
&
((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
1
))
->
outputDataBlockDesc
;
pJoin
->
pOnConditions
=
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pOnConditions
);
CHECK_ALLOC
(
pJoin
->
pOnConditions
,
(
SPhysiNode
*
)
pJoin
);
pJoin
->
pTargets
=
createJoinOutputCols
(
pCxt
,
pLeftDesc
,
pRightDesc
);
CHECK_ALLOC
(
pJoin
->
pTargets
,
(
SPhysiNode
*
)
pJoin
);
CHECK_CODE
(
addDataBlockDesc
(
pCxt
,
pJoin
->
pTargets
,
&
pJoin
->
node
.
outputDataBlockDesc
),
(
SPhysiNode
*
)
pJoin
);
CHECK_CODE
(
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pJoinLogicNode
,
(
SPhysiNode
*
)
pJoin
),
(
SPhysiNode
*
)
pJoin
);
CHECK_CODE
(
setSlotOutput
(
pCxt
,
pJoinLogicNode
->
node
.
pTargets
,
&
pJoin
->
node
.
outputDataBlockDesc
),
(
SPhysiNode
*
)
pJoin
);
return
(
SPhysiNode
*
)
pJoin
;
}
typedef
struct
SRewritePrecalcExprsCxt
{
int32_t
errCode
;
int32_t
planNodeId
;
int32_t
rewriteId
;
SNodeList
*
pPrecalcExprs
;
}
SRewritePrecalcExprsCxt
;
static
EDealRes
collectAndRewrite
(
SRewritePrecalcExprsCxt
*
pCxt
,
SNode
**
pNode
)
{
SNode
*
pExpr
=
nodesCloneNode
(
*
pNode
);
CHECK_ALLOC
(
pExpr
,
DEAL_RES_ERROR
);
if
(
nodesListAppend
(
pCxt
->
pPrecalcExprs
,
pExpr
))
{
nodesDestroyNode
(
pExpr
);
return
DEAL_RES_ERROR
;
}
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
nodesDestroyNode
(
pExpr
);
return
DEAL_RES_ERROR
;
}
SExprNode
*
pRewrittenExpr
=
(
SExprNode
*
)
pExpr
;
pCol
->
node
.
resType
=
pRewrittenExpr
->
resType
;
if
(
'\0'
!=
pRewrittenExpr
->
aliasName
[
0
])
{
strcpy
(
pCol
->
colName
,
pRewrittenExpr
->
aliasName
);
}
else
{
snprintf
(
pRewrittenExpr
->
aliasName
,
sizeof
(
pRewrittenExpr
->
aliasName
),
"#expr_%d_%d"
,
pCxt
->
planNodeId
,
pCxt
->
rewriteId
);
strcpy
(
pCol
->
colName
,
pRewrittenExpr
->
aliasName
);
}
nodesDestroyNode
(
*
pNode
);
*
pNode
=
(
SNode
*
)
pCol
;
return
DEAL_RES_IGNORE_CHILD
;
}
static
EDealRes
doRewritePrecalcExprs
(
SNode
**
pNode
,
void
*
pContext
)
{
SRewritePrecalcExprsCxt
*
pCxt
=
(
SRewritePrecalcExprsCxt
*
)
pContext
;
switch
(
nodeType
(
*
pNode
))
{
case
QUERY_NODE_OPERATOR
:
case
QUERY_NODE_LOGIC_CONDITION
:
{
return
collectAndRewrite
(
pContext
,
pNode
);
}
case
QUERY_NODE_FUNCTION
:
{
if
(
!
fmIsAggFunc
(((
SFunctionNode
*
)(
*
pNode
))
->
funcId
))
{
return
collectAndRewrite
(
pContext
,
pNode
);
}
}
default:
break
;
}
return
DEAL_RES_CONTINUE
;
}
static
int32_t
rewritePrecalcExprs
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pList
,
SNodeList
**
pPrecalcExprs
,
SNodeList
**
pRewrittenList
)
{
if
(
NULL
==
pList
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
*
pPrecalcExprs
)
{
*
pPrecalcExprs
=
nodesMakeList
();
CHECK_ALLOC
(
*
pPrecalcExprs
,
TSDB_CODE_OUT_OF_MEMORY
);
}
if
(
NULL
==
*
pRewrittenList
)
{
*
pRewrittenList
=
nodesMakeList
();
CHECK_ALLOC
(
*
pRewrittenList
,
TSDB_CODE_OUT_OF_MEMORY
);
}
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pList
)
{
SNode
*
pNew
=
NULL
;
if
(
QUERY_NODE_GROUPING_SET
==
nodeType
(
pNode
))
{
pNew
=
nodesCloneNode
(
nodesListGetNode
(((
SGroupingSetNode
*
)
pNode
)
->
pParameterList
,
0
));
}
else
{
pNew
=
nodesCloneNode
(
pNode
);
}
CHECK_ALLOC
(
pNew
,
TSDB_CODE_OUT_OF_MEMORY
);
CHECK_CODE
(
nodesListAppend
(
*
pRewrittenList
,
pNew
),
TSDB_CODE_OUT_OF_MEMORY
);
}
SRewritePrecalcExprsCxt
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
pPrecalcExprs
=
*
pPrecalcExprs
};
nodesRewriteList
(
*
pRewrittenList
,
doRewritePrecalcExprs
,
&
cxt
);
if
(
0
==
LIST_LENGTH
(
cxt
.
pPrecalcExprs
))
{
nodesDestroyList
(
cxt
.
pPrecalcExprs
);
*
pPrecalcExprs
=
NULL
;
}
return
cxt
.
errCode
;
}
static
SPhysiNode
*
createAggPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SAggLogicNode
*
pAggLogicNode
)
{
SAggPhysiNode
*
pAgg
=
(
SAggPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_AGG
);
CHECK_ALLOC
(
pAgg
,
NULL
);
SNodeList
*
pPrecalcExprs
=
NULL
;
SNodeList
*
pGroupKeys
=
NULL
;
SNodeList
*
pAggFuncs
=
NULL
;
CHECK_CODE
(
rewritePrecalcExprs
(
pCxt
,
pAggLogicNode
->
pGroupKeys
,
&
pPrecalcExprs
,
&
pGroupKeys
),
(
SPhysiNode
*
)
pAgg
);
CHECK_CODE
(
rewritePrecalcExprs
(
pCxt
,
pAggLogicNode
->
pAggFuncs
,
&
pPrecalcExprs
,
&
pAggFuncs
),
(
SPhysiNode
*
)
pAgg
);
SDataBlockDescNode
*
pChildTupe
=
&
(((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
outputDataBlockDesc
);
// push down expression to outputDataBlockDesc of child node
if
(
NULL
!=
pPrecalcExprs
)
{
pAgg
->
pExprs
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pPrecalcExprs
);
CHECK_ALLOC
(
pAgg
->
pExprs
,
(
SPhysiNode
*
)
pAgg
);
CHECK_CODE
(
addDataBlockDesc
(
pCxt
,
pAgg
->
pExprs
,
pChildTupe
),
(
SPhysiNode
*
)
pAgg
);
}
if
(
NULL
!=
pGroupKeys
)
{
pAgg
->
pGroupKeys
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pGroupKeys
);
CHECK_ALLOC
(
pAgg
->
pGroupKeys
,
(
SPhysiNode
*
)
pAgg
);
CHECK_CODE
(
addDataBlockDesc
(
pCxt
,
pAgg
->
pGroupKeys
,
&
pAgg
->
node
.
outputDataBlockDesc
),
(
SPhysiNode
*
)
pAgg
);
}
if
(
NULL
!=
pAggFuncs
)
{
pAgg
->
pAggFuncs
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pAggFuncs
);
CHECK_ALLOC
(
pAgg
->
pAggFuncs
,
(
SPhysiNode
*
)
pAgg
);
CHECK_CODE
(
addDataBlockDesc
(
pCxt
,
pAgg
->
pAggFuncs
,
&
pAgg
->
node
.
outputDataBlockDesc
),
(
SPhysiNode
*
)
pAgg
);
}
CHECK_CODE
(
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pAggLogicNode
,
(
SPhysiNode
*
)
pAgg
),
(
SPhysiNode
*
)
pAgg
);
CHECK_CODE
(
setSlotOutput
(
pCxt
,
pAggLogicNode
->
node
.
pTargets
,
&
pAgg
->
node
.
outputDataBlockDesc
),
(
SPhysiNode
*
)
pAgg
);
return
(
SPhysiNode
*
)
pAgg
;
}
static
SPhysiNode
*
createProjectPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SProjectLogicNode
*
pProjectLogicNode
)
{
SProjectPhysiNode
*
pProject
=
(
SProjectPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
);
CHECK_ALLOC
(
pProject
,
NULL
);
pProject
->
pProjections
=
setListSlotId
(
pCxt
,
((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
outputDataBlockDesc
.
dataBlockId
,
-
1
,
pProjectLogicNode
->
pProjections
);
CHECK_ALLOC
(
pProject
->
pProjections
,
(
SPhysiNode
*
)
pProject
);
CHECK_CODE
(
addDataBlockDesc
(
pCxt
,
pProject
->
pProjections
,
&
pProject
->
node
.
outputDataBlockDesc
),
(
SPhysiNode
*
)
pProject
);
CHECK_CODE
(
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pProjectLogicNode
,
(
SPhysiNode
*
)
pProject
),
(
SPhysiNode
*
)
pProject
);
return
(
SPhysiNode
*
)
pProject
;
}
static
SPhysiNode
*
createPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SLogicNode
*
pLogicPlan
)
{
SNodeList
*
pChildren
=
nodesMakeList
();
CHECK_ALLOC
(
pChildren
,
NULL
);
SNode
*
pLogicChild
;
FOREACH
(
pLogicChild
,
pLogicPlan
->
pChildren
)
{
SNode
*
pChildPhyNode
=
(
SNode
*
)
createPhysiNode
(
pCxt
,
(
SLogicNode
*
)
pLogicChild
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
pChildren
,
pChildPhyNode
))
{
pCxt
->
errCode
=
TSDB_CODE_OUT_OF_MEMORY
;
nodesDestroyList
(
pChildren
);
return
NULL
;
}
}
SPhysiNode
*
pPhyNode
=
NULL
;
switch
(
nodeType
(
pLogicPlan
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
pPhyNode
=
createScanPhysiNode
(
pCxt
,
(
SScanLogicNode
*
)
pLogicPlan
);
break
;
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
pPhyNode
=
createJoinPhysiNode
(
pCxt
,
pChildren
,
(
SJoinLogicNode
*
)
pLogicPlan
);
break
;
case
QUERY_NODE_LOGIC_PLAN_AGG
:
pPhyNode
=
createAggPhysiNode
(
pCxt
,
pChildren
,
(
SAggLogicNode
*
)
pLogicPlan
);
break
;
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
pPhyNode
=
createProjectPhysiNode
(
pCxt
,
pChildren
,
(
SProjectLogicNode
*
)
pLogicPlan
);
break
;
default:
break
;
}
pPhyNode
->
pChildren
=
pChildren
;
SNode
*
pChild
;
FOREACH
(
pChild
,
pPhyNode
->
pChildren
)
{
((
SPhysiNode
*
)
pChild
)
->
pParent
=
pPhyNode
;
}
return
pPhyNode
;
}
int32_t
createPhysiPlan
(
SLogicNode
*
pLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SPhysiPlanContext
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
nextDataBlockId
=
0
,
.
pLocationHelper
=
taosArrayInit
(
32
,
POINTER_BYTES
)
};
if
(
NULL
==
cxt
.
pLocationHelper
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pPhyNode
=
createPhysiNode
(
&
cxt
,
pLogicNode
);
return
cxt
.
errCode
;
}
int32_t
buildPhysiPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SQueryPlan
**
pPlan
)
{
// split
// scale out
// maping
// create
return
TSDB_CODE_SUCCESS
;
}
source/libs/planner/src/physicalPlanJson.c
已删除
100644 → 0
浏览文件 @
768e0593
此差异已折叠。
点击以展开。
source/libs/planner/src/planner.c
浏览文件 @
cd7a0e55
...
...
@@ -15,8 +15,22 @@
#include "planner.h"
int32_t
qCreateQueryPlan
(
SPlanContext
*
pCxt
,
SQueryPlan
**
pPlan
)
{
#include "plannerInt.h"
int32_t
optimize
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
qCreateQueryPlan
(
SPlanContext
*
pCxt
,
SQueryPlan
**
pPlan
)
{
SLogicNode
*
pLogicNode
=
NULL
;
int32_t
code
=
createLogicPlan
(
pCxt
,
&
pLogicNode
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
optimize
(
pCxt
,
pLogicNode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
buildPhysiPlan
(
pCxt
,
pLogicNode
,
pPlan
);
}
return
code
;
}
void
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SDownstreamSource
*
pSource
)
{
...
...
source/libs/planner/src/plannerImpl.c
已删除
100644 → 0
浏览文件 @
768e0593
此差异已折叠。
点击以展开。
source/libs/planner/test/phyPlanTests.cpp
已删除
100644 → 0
浏览文件 @
768e0593
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include <gtest/gtest.h>
#include "plannerInt.h"
#include "mockCatalogService.h"
using namespace std;
using namespace testing;
void* myCalloc(size_t nmemb, size_t size) {
if (void* p = calloc(nmemb, size)) {
return p;
}
throw bad_alloc();
}
class PhyPlanTest : public Test {
protected:
void pushAgg(int32_t aggOp) {
unique_ptr<SQueryPlanNode> agg((SQueryPlanNode*)myCalloc(1, sizeof(SQueryPlanNode)));
agg->info.type = aggOp;
agg->pExpr = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
unique_ptr<SExprInfo> expr((SExprInfo*)myCalloc(1, sizeof(SExprInfo)));
expr->base.resSchema.type = TSDB_DATA_TYPE_INT;
expr->base.resSchema.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
expr->pExpr = (tExprNode*)myCalloc(1, sizeof(tExprNode));
expr->pExpr->nodeType = TEXPR_FUNCTION_NODE;
strcpy(expr->pExpr->_function.functionName, "Count");
SExprInfo* item = expr.release();
taosArrayPush(agg->pExpr, &item);
pushNode(agg.release());
}
void pushScan(const string& db, const string& table, int32_t scanOp) {
shared_ptr<MockTableMeta> meta = mockCatalogService->getTableMeta(db, table);
EXPECT_TRUE(meta);
unique_ptr<SQueryPlanNode> scan((SQueryPlanNode*)myCalloc(1, sizeof(SQueryPlanNode)));
scan->info.type = scanOp;
scan->numOfCols = meta->schema->tableInfo.numOfColumns;
scan->pSchema = (SSchema*)myCalloc(1, sizeof(SSchema) * scan->numOfCols);
memcpy(scan->pSchema, meta->schema->schema, sizeof(SSchema) * scan->numOfCols);
//todo 'pExpr' 'numOfExpr'
scan->pExtInfo = createScanExtInfo(meta);
pushNode(scan.release());
}
int32_t run() {
SQueryDag* dag = nullptr;
uint64_t requestId = 20;
int32_t code = createDag(logicPlan_.get(), nullptr, &dag, NULL, requestId);
dag_.reset(dag);
return code;
}
int32_t run(const string& db, const string& sql) {
SParseContext cxt;
buildParseContext(db, sql, &cxt);
SQueryNode* query;
int32_t code = qParseQuerySql(&cxt, &query);
if (TSDB_CODE_SUCCESS != code) {
cout << "error no:" << code << ", msg:" << cxt.pMsg << endl;
return code;
}
SQueryDag* dag = nullptr;
uint64_t requestId = 20;
SSchema *schema = NULL;
int32_t numOfOutput = 0;
code = qCreateQueryDag(query, &dag, &schema, &numOfOutput, nullptr, requestId);
dag_.reset(dag);
return code;
}
void explain() {
size_t level = taosArrayGetSize(dag_->pSubplans);
for (size_t i = 0; i < level; ++i) {
std::cout << "level " << i << ":" << std::endl;
const SArray* subplans = (const SArray*)taosArrayGetP(dag_->pSubplans, i);
size_t num = taosArrayGetSize(subplans);
for (size_t j = 0; j < num; ++j) {
std::cout << "no " << j << ":" << std::endl;
int32_t len = 0;
char* str = nullptr;
ASSERT_EQ(TSDB_CODE_SUCCESS, qSubPlanToString((const SSubplan*)taosArrayGetP(subplans, j), &str, &len));
std::cout << "len:" << len << std::endl;
std::cout << str << std::endl;
free(str);
}
}
}
SQueryDag* result() {
return dag_.get();
}
private:
void pushNode(SQueryPlanNode* node) {
if (logicPlan_) {
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
SQueryPlanNode* child = logicPlan_.release();
taosArrayPush(node->pChildren, &child);
}
logicPlan_.reset(node);
}
void copySchemaMeta(STableMeta** dst, const STableMeta* src) {
int32_t size = sizeof(STableMeta) + sizeof(SSchema) * (src->tableInfo.numOfTags + src->tableInfo.numOfColumns);
*dst = (STableMeta*)myCalloc(1, size);
memcpy(*dst, src, size);
}
void copyStorageMeta(SVgroupsInfo** dst, const std::vector<SVgroupInfo>& src) {
*dst = (SVgroupsInfo*)myCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * src.size());
(*dst)->numOfVgroups = src.size();
for (int32_t i = 0; i < src.size(); ++i) {
(*dst)->vgroups[i] = src[i];
}
}
SQueryTableInfo* createScanExtInfo(shared_ptr<MockTableMeta>& meta) {
SQueryTableInfo* info = (SQueryTableInfo*)myCalloc(1, sizeof(SQueryTableInfo));
info->pMeta = (STableMetaInfo*)myCalloc(1, sizeof(STableMetaInfo));
copySchemaMeta(&info->pMeta->pTableMeta, meta->schema.get());
copyStorageMeta(&info->pMeta->vgroupList, meta->vgs);
return info;
}
void buildParseContext(const string& db, const string& sql, SParseContext* pCxt) {
static string _db;
static string _sql;
static const int32_t _msgMaxLen = 4096;
static char _msg[_msgMaxLen];
_db = db;
_sql = sql;
memset(_msg, 0, _msgMaxLen);
pCxt->acctId = 1;
pCxt->db = _db.c_str();
pCxt->requestId = 1;
pCxt->pSql = _sql.c_str();
pCxt->sqlLen = _sql.length();
pCxt->pMsg = _msg;
pCxt->msgLen = _msgMaxLen;
}
shared_ptr<MockTableMeta> meta_;
unique_ptr<SQueryPlanNode> logicPlan_;
unique_ptr<SQueryDag> dag_;
};
// select * from table
TEST_F(PhyPlanTest, tableScanTest) {
pushScan("test", "t1", QNODE_TABLESCAN);
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
explain();
SQueryDag* dag = result();
// todo check
}
TEST_F(PhyPlanTest, serializeTest) {
pushScan("test", "t1", QNODE_TABLESCAN);
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
SQueryDag* dag = result();
cout << qDagToString(dag) << endl;
}
// select * from supertable
TEST_F(PhyPlanTest, superTableScanTest) {
pushScan("test", "st1", QNODE_TABLESCAN);
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
explain();
SQueryDag* dag = result();
// todo check
}
// select count(*) from table
TEST_F(PhyPlanTest, simpleAggTest) {
pushScan("test", "t1", QNODE_TABLESCAN);
pushAgg(QNODE_AGGREGATE);
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
explain();
SQueryDag* dag = result();
// todo check
}
// insert into t values(...)
TEST_F(PhyPlanTest, insertTest) {
ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS);
explain();
SQueryDag* dag = result();
// todo check
}
#endif
\ No newline at end of file
source/libs/planner/test/
newP
lannerTest.cpp
→
source/libs/planner/test/
p
lannerTest.cpp
浏览文件 @
cd7a0e55
...
...
@@ -17,8 +17,8 @@
#include <gtest/gtest.h>
#include "plannerImpl.h"
#include "parser.h"
#include "plannerInt.h"
using
namespace
std
;
using
namespace
testing
;
...
...
@@ -56,7 +56,8 @@ protected:
const
string
syntaxTreeStr
=
toString
(
query_
->
pRoot
,
false
);
SLogicNode
*
pLogicPlan
=
nullptr
;
code
=
createLogicPlan
(
query_
->
pRoot
,
&
pLogicPlan
);
SPlanContext
cxt
=
{
.
queryId
=
1
,
.
pAstRoot
=
query_
->
pRoot
};
code
=
createLogicPlan
(
&
cxt
,
&
pLogicPlan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] logic plan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
false
;
...
...
source/libs/p
arser/inc/astCreateContext.h
→
source/libs/p
lanner/test/plannerTestMain.cpp
浏览文件 @
cd7a0e55
...
...
@@ -13,28 +13,29 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_AST_CREATER_H_
#define _TD_AST_CREATER_H_
#include <string>
#ifdef __cplusplus
extern
"C"
{
#endif
#include <gtest/gtest.h>
#include "nodes.h"
#include "parser.h"
#include "mockCatalog.h"
typedef
struct
SAstCreateContex
t
{
SParseContext
*
pQueryCxt
;
bool
notSupport
;
bool
valid
;
SNode
*
pRootNode
;
}
SAstCreateContext
;
class
PlannerEnv
:
public
testing
::
Environmen
t
{
public:
virtual
void
SetUp
()
{
initMetaDataEnv
()
;
generateMetaData
()
;
}
int32_t
createAstCreateContext
(
SParseContext
*
pQueryCxt
,
SAstCreateContext
*
pCxt
);
int32_t
destroyAstCreateContext
(
SAstCreateContext
*
pCxt
);
virtual
void
TearDown
()
{
destroyMetaDataEnv
();
}
#ifdef __cplusplus
}
#endif
PlannerEnv
()
{}
virtual
~
PlannerEnv
()
{
}
};
#endif
/*_TD_AST_CREATER_H_*/
int
main
(
int
argc
,
char
*
argv
[])
{
testing
::
AddGlobalTestEnvironment
(
new
PlannerEnv
());
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
source/libs/planner/test/plannerTests.cpp
已删除
100644 → 0
浏览文件 @
768e0593
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <iostream>
#include "os.h"
#include "taos.h"
#include "parser.h"
#include "mockCatalog.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
class
PlannerEnv
:
public
testing
::
Environment
{
public:
virtual
void
SetUp
()
{
initMetaDataEnv
();
generateMetaData
();
}
virtual
void
TearDown
()
{
destroyMetaDataEnv
();
}
PlannerEnv
()
{}
virtual
~
PlannerEnv
()
{}
};
int
main
(
int
argc
,
char
*
argv
[])
{
testing
::
AddGlobalTestEnvironment
(
new
PlannerEnv
());
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
TEST
(
testCase
,
planner_test
)
{
char
msg
[
128
]
=
{
0
};
const
char
*
sql
=
"select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)"
;
// SQueryStmtInfo* pQueryInfo = nullptr;
// int32_t code = qParseQuerySql(sql, strlen(sql), &pQueryInfo, 0, msg, sizeof(msg));
// ASSERT_EQ(code, 0);
// SSqlNode* pNode = (SSqlNode*)taosArrayGetP(((SArray*)info1.list), 0);
// int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
// ASSERT_EQ(code, 0);
//
// SMetaReq req = {0};
// int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// SQueryStmtInfo* pQueryInfo = createQueryInfo();
// setTableMetaInfo(pQueryInfo, &req);
//
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
// ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
// ASSERT_EQ(ret, 0);
//
// SArray* pExprList = pQueryInfo->exprList;
// ASSERT_EQ(taosArrayGetSize(pExprList), 2);
//
// SExprInfo* p1 = (SExprInfo*)taosArrayGetP(pExprList, 1);
// ASSERT_EQ(p1->base.uid, 110);
// ASSERT_EQ(p1->base.numOfParams, 1);
// ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
// ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
// ASSERT_EQ(p1->base.colInfo.flag, TSDB_COL_NORMAL);
// ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
// ASSERT_EQ(p1->base.interBytes, 16);
//
// ASSERT_EQ(p1->pExpr->nodeType, TEXPR_UNARYEXPR_NODE);
// ASSERT_EQ(p1->pExpr->_node.functionId, FUNCTION_TOP);
// ASSERT_TRUE(p1->pExpr->_node.pRight == NULL);
//
// tExprNode* pParam = p1->pExpr->_node.pLeft;
//
// ASSERT_EQ(pParam->nodeType, TEXPR_BINARYEXPR_NODE);
// ASSERT_EQ(pParam->_node.optr, TSDB_BINARY_OP_DIVIDE);
// ASSERT_EQ(pParam->_node.pLeft->nodeType, TEXPR_BINARYEXPR_NODE);
// ASSERT_EQ(pParam->_node.pRight->nodeType, TEXPR_VALUE_NODE);
//
// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
//
// destroyQueryInfo(pQueryInfo);
// qParserCleanupMetaRequestInfo(&req);
// destroySqlInfo(&info1);
}
#pragma GCC diagnostic pop
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录