提交 ee9013e4 编写于 作者: H Haojun Liao

[TD-10529]refactor & add required APIs.

上级 c6d23aa8
......@@ -25,6 +25,7 @@ extern "C" {
#include "tarray.h"
#include "taosdef.h"
#include "transport.h"
#include "common.h"
struct SCatalog;
......@@ -63,13 +64,6 @@ typedef struct SCTableMeta {
uint64_t suid;
} SCTableMeta;
typedef struct SSchema {
uint8_t type;
char name[TSDB_COL_NAME_LEN];
int16_t colId;
int16_t bytes;
} SSchema;
/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta.
*/
......
......@@ -29,8 +29,7 @@ typedef void* qinfo_t;
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId);
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId);
/**
* the main query execution function, including query on both table and multiple tables,
......
......@@ -22,8 +22,149 @@ extern "C" {
#include "catalog.h"
#include "common.h"
#include "tname.h"
typedef struct SInterval {
int32_t tz; // query client timezone
char intervalUnit;
char slidingUnit;
char offsetUnit;
int64_t interval;
int64_t sliding;
int64_t offset;
} SInterval;
typedef struct SSessionWindow {
int64_t gap; // gap between two session window(in microseconds)
int32_t primaryColId; // primary timestamp column
} SSessionWindow;
typedef struct SGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
} SGroupbyExpr;
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int16_t bytes;
} SField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
SField *final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct SOrder {
uint32_t order;
int32_t orderColId;
} SOrder;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
int16_t relType; // relation between tbname list and query condition, including : TK_AND or TK_OR
SCond tbnameCond; // tbname query condition, only support tbname query condition on one table
SJoinInfo joinInfo; // join condition, only support two tables join currently
SArray *pCond; // for different table, the query condition must be seperated
} STagCond;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
uint32_t tableMetaSize;
size_t tableMetaCapacity;
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t vgroupIndex;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SQueryStmtInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*>
SArray * exprList1; // final exprlist in case of arithmetic expression exists
SLimit limit;
SLimit slimit;
STagCond tagCond;
SArray * colCond;
SOrder order;
int16_t numOfTables;
int16_t curTableIdx;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int16_t fillType; // final result fill type
int64_t * fillVal; // default value for fill
int32_t numOfFillVal; // fill value size
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
bool distinct; // distinct tag or not
bool onlyHasTagCond;
int32_t bufLen;
char* buf;
SArray *pUdfInfo;
struct SQueryStmtInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
struct SQueryStmtInfo *pDownstream;
int32_t havingFieldNum;
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
bool onlyTagQuery;
bool orderProjectQuery;
bool stateWindow;
bool globalMerge;
bool multigroupResult;
} SQueryStmtInfo;
struct SQueryStmtInfo;
struct SInsertStmtInfo;
/**
......
......@@ -20,12 +20,35 @@
extern "C" {
#endif
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
struct SEpSet;
struct SQueryNode;
struct SQueryPhyNode;
struct SQueryStmtInfo;
typedef struct SSubquery {
int64_t queryId; // the subquery id created by qnode
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL
int32_t level; // the execution level of current subquery, starting from 0.
SArray *pUpstream; // the upstream,from which to fetch the result
struct SQueryPhyNode *pNode; // physical plan of current subquery
} SSubquery;
typedef struct SQueryJob {
SArray **pSubqueries;
int32_t numOfLevels;
int32_t currentLevel;
} SQueryJob;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
* @return
*/
int32_t qOptimizeQueryPlan(SQueryNode* pQueryNode);
int32_t qOptimizeQueryPlan(struct SQueryNode* pQueryNode);
/**
* Create the query plan according to the bound AST, which is in the form of pQueryInfo
......@@ -33,14 +56,14 @@ int32_t qOptimizeQueryPlan(SQueryNode* pQueryNode);
* @param pQueryNode
* @return
*/
int32_t qCreateQueryPlan(const SQueryInfo* pQueryInfo, SQueryNode* pQueryNode);
int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryNode* pQueryNode);
/**
* Convert the query plan to string, in order to display it in the shell.
* @param pQueryNode
* @return
*/
int32_t qQueryPlanToString(SQueryNode* pQueryNode, char** str);
int32_t qQueryPlanToString(struct SQueryNode* pQueryNode, char** str);
/**
* Restore the SQL statement according to the logic query plan.
......@@ -48,7 +71,7 @@ int32_t qQueryPlanToString(SQueryNode* pQueryNode, char** str);
* @param sql
* @return
*/
int32_t qQueryPlanToSql(SQueryNode* pQueryNode, char** sql);
int32_t qQueryPlanToSql(struct SQueryNode* pQueryNode, char** sql);
/**
* Create the physical plan for the query, according to the logic plan.
......@@ -56,7 +79,7 @@ int32_t qQueryPlanToSql(SQueryNode* pQueryNode, char** sql);
* @param pPhyNode
* @return
*/
int32_t qCreatePhysicalPlan(SQueryNode* pQueryNode, SEpSet* pQnode, SQueryPhyNode *pPhyNode);
int32_t qCreatePhysicalPlan(struct SQueryNode* pQueryNode, struct SEpSet* pQnode, struct SQueryPhyNode *pPhyNode);
/**
* Convert to physical plan to string to enable to print it out in the shell.
......@@ -64,7 +87,28 @@ int32_t qCreatePhysicalPlan(SQueryNode* pQueryNode, SEpSet* pQnode, SQueryPhyNod
* @param str
* @return
*/
int32_t qPhyPlanToString(SQueryPhyNode *pPhyNode, char** str);
int32_t qPhyPlanToString(struct SQueryPhyNode *pPhyNode, char** str);
/**
* Destroy the query plan object.
* @return
*/
void* qDestroyQueryPlan(struct SQueryNode* pQueryNode);
/**
* Destroy the physical plan.
* @param pQueryPhyNode
* @return
*/
void* qDestroyQueryPhyPlan(struct SQueryPhyNode* pQueryPhyNode);
/**
* Create the query job from the physical execution plan
* @param pPhyNode
* @param pJob
* @return
*/
int32_t qCreateQueryJob(const struct SQueryPhyNode* pPhyNode, struct SQueryJob** pJob);
#ifdef __cplusplus
}
......
......@@ -20,16 +20,7 @@
extern "C" {
#endif
#define QUERY_TASK_MERGE 1
#define QUERY_TASK_PARTIAL 2
/**
* create query job from the physical execution plan
* @param pPhyNode
* @param pJob
* @return
*/
int32_t qCreateQueryJob(const SQueryPhyNode* pPhyNode, SQueryJob* pJob);
struct SQueryJob;
/**
* Process the query job, generated according to the query physical plan.
......@@ -37,7 +28,7 @@ int32_t qCreateQueryJob(const SQueryPhyNode* pPhyNode, SQueryJob* pJob);
* @param pJob
* @return
*/
int32_t qProcessQueryJob(SQueryJob* pJob);
int32_t qProcessQueryJob(struct SQueryJob* pJob);
/**
* The SSqlObj should not be here????
......@@ -46,14 +37,14 @@ int32_t qProcessQueryJob(SQueryJob* pJob);
* @param pRetVgroupId
* @return
*/
SArray* qGetInvolvedVgroupIdList(SSqlObj* pSql, SArray* pVgroupId, SArray* pRetVgroupId);
//SArray* qGetInvolvedVgroupIdList(struct SSqlObj* pSql, SArray* pVgroupId, SArray* pRetVgroupId);
/**
* Cancel query job
* @param pJob
* @return
*/
int32_t qKillQueryJob(SQueryJob* pJob);
int32_t qKillQueryJob(struct SQueryJob* pJob);
#ifdef __cplusplus
}
......
......@@ -28,6 +28,9 @@ extern "C" {
#include <string.h>
#include <sched.h>
#include <ctype.h>
#include <errno.h>
#include <float.h>
#include <math.h>
#include "osAtomic.h"
#include "osDef.h"
......
......@@ -11,4 +11,11 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
*/
//#include "taos.h"
//TAOS_RES *taos_query(TAOS *taos, const char *sql) {
//
//}
......@@ -4,4 +4,9 @@ target_include_directories(
common
PUBLIC "${CMAKE_SOURCE_DIR}/include/common"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
common
PRIVATE os util
)
\ No newline at end of file
......@@ -21,96 +21,10 @@ extern "C" {
#endif
#include "catalog.h"
#include "parser.h"
#include "tname.h"
#include "astGen.h"
#include "astGenerator.h"
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int16_t bytes;
} SField;
typedef struct SInterval {
int32_t tz; // query client timezone
char intervalUnit;
char slidingUnit;
char offsetUnit;
int64_t interval;
int64_t sliding;
int64_t offset;
} SInterval;
typedef struct SSessionWindow {
int64_t gap; // gap between two session window(in microseconds)
int32_t primaryColId; // primary timestamp column
} SSessionWindow;
typedef struct SGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t numOfGroupCols; // todo remove it
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
} SGroupbyExpr;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
SField *final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SLimitVal {
int64_t limit;
int64_t offset;
} SLimitVal;
typedef struct SOrderVal {
uint32_t order;
int32_t orderColId;
} SOrderVal;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
int16_t relType; // relation between tbname list and query condition, including : TK_AND or TK_OR
SCond tbnameCond; // tbname query condition, only support tbname query condition on one table
SJoinInfo joinInfo; // join condition, only support two tables join currently
SArray *pCond; // for different table, the query condition must be seperated
} STagCond;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
uint32_t tableMetaSize;
size_t tableMetaCapacity;
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t vgroupIndex;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
struct SSqlNode;
typedef struct SInsertStmtInfo {
SHashObj *pTableBlockHashList; // data block for each table
......@@ -121,71 +35,16 @@ typedef struct SInsertStmtInfo {
char *sql; // current sql statement position
} SInsertStmtInfo;
typedef struct SQueryStmtInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*>
SArray * exprList1; // final exprlist in case of arithmetic expression exists
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
SArray * colCond;
SOrderVal order;
int16_t numOfTables;
int16_t curTableIdx;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int16_t fillType; // final result fill type
int64_t * fillVal; // default value for fill
int32_t numOfFillVal; // fill value size
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
bool distinct; // distinct tag or not
bool onlyHasTagCond;
int32_t bufLen;
char* buf;
SArray *pUdfInfo;
struct SQueryStmtInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
struct SQueryStmtInfo *pDownstream;
int32_t havingFieldNum;
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
bool onlyTagQuery;
bool orderProjectQuery;
bool stateWindow;
bool globalMerge;
bool multigroupResult;
} SQueryStmtInfo;
/**
* validate the AST by pNode
* @param pNode
* @return SQueryNode a bounded AST with essential meta data from local buffer or mgmt node
* Validate the sql info, according to the corresponding metadata info from catalog.
* @param pCatalog
* @param pSqlInfo
* @param pQueryInfo a bounded AST with essential meta data from local buffer or mgmt node
* @param id
* @param msg
* @return
*/
int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, const SSqlNode* pNode, SQueryStmtInfo* pQueryInfo);
int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQueryStmtInfo* pQueryInfo, int64_t id, char* msg);
/**
*
......@@ -193,7 +52,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, const SSqlNode* pNode,
* @param pMetaInfo
* @return
*/
int32_t qParserExtractRequestedMetaInfo(const SSqlNode* pSqlNode, SMetaReq* pMetaInfo);
int32_t qParserExtractRequestedMetaInfo(const struct SSqlNode* pSqlNode, SMetaReq* pMetaInfo);
#ifdef __cplusplus
}
......
......@@ -14,5 +14,42 @@
*/
#include "parserInt.h"
#include "astGen.h"
#include "ttoken.h"
\ No newline at end of file
#include "ttoken.h"
#include "astGenerator.h"
bool qIsInsertSql(const char* pStr, size_t length) {
return false;
}
int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** pQueryInfo, int64_t id, char* msg) {
*pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));
if (*pQueryInfo == NULL) {
return -1; // set correct error code.
}
SSqlInfo info = genAST(pStr);
if (!info.valid) {
strcpy(msg, info.msg);
return -1; // set correct error code.
}
struct SCatalog* pCatalog = getCatalogHandle(NULL);
int32_t code = qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg);
if (code != 0) {
return code;
}
return 0;
}
int32_t qParseInsertSql(const char* pStr, size_t length, struct SInsertStmtInfo** pInsertInfo, int64_t id, char* msg) {
return 0;
}
int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) {
return 0;
}
int32_t qParserExtractRequestedMetaInfo(const struct SSqlNode* pSqlNode, SMetaReq* pMetaInfo) {
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
......@@ -4,4 +4,9 @@ target_include_directories(
planner
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/planner"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
planner
PRIVATE os util common catalog parser
)
\ No newline at end of file
......@@ -20,6 +20,39 @@
extern "C" {
#endif
#include "common.h"
#include "tarray.h"
#include "planner.h"
typedef struct SQueryNodeBasicInfo {
int32_t type;
char *name;
} SQueryNodeBasicInfo;
typedef struct SQueryTableInfo {
char *tableName;
uint64_t uid;
int32_t tid;
} SQueryTableInfo;
typedef struct SQueryNode {
SQueryNodeBasicInfo info;
SQueryTableInfo tableInfo;
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
struct SExprInfo *pExpr; // the query functions or sql aggregations
int32_t numOfOutput; // number of result columns, which is also the number of pExprs
void *pExtInfo; // additional information
// previous operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
SArray *pPrevNodes; // upstream nodes
struct SQueryNode *nextNode;
} SQueryNode;
typedef struct SQueryPhyNode {
} SQueryPhyNode;
#ifdef __cplusplus
}
#endif
......
......@@ -11,4 +11,44 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
*/
#include "os.h"
#include "plannerInt.h"
#include "parser.h"
int32_t qOptimizeQueryPlan(struct SQueryNode* pQueryNode) {
return 0;
}
int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryNode* pQueryNode) {
return 0;
}
int32_t qQueryPlanToString(struct SQueryNode* pQueryNode, char** str) {
return 0;
}
int32_t qQueryPlanToSql(struct SQueryNode* pQueryNode, char** sql) {
return 0;
}
int32_t qCreatePhysicalPlan(struct SQueryNode* pQueryNode, struct SEpSet* pQnode, struct SQueryPhyNode *pPhyNode) {
return 0;
}
int32_t qPhyPlanToString(struct SQueryPhyNode *pPhyNode, char** str) {
return 0;
}
void* qDestroyQueryPlan(struct SQueryNode* pQueryNode) {
return NULL;
}
void* qDestroyQueryPhyPlan(struct SQueryPhyNode* pQueryPhyNode) {
return NULL;
}
int32_t qCreateQueryJob(const struct SQueryPhyNode* pPhyNode, struct SQueryJob** pJob) {
return 0;
}
\ No newline at end of file
aux_source_directory(src SCHEDULER_SRC)
add_library(scheduler ${SCHEDULER_SRC})
target_include_directories(
scheduler
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/scheduler"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
scheduler
PRIVATE os util planner
)
\ No newline at end of file
......@@ -20,6 +20,25 @@
extern "C" {
#endif
#include "os.h"
#include "tarray.h"
#include "planner.h"
#include "scheduler.h"
typedef struct SSubquery {
int64_t taskId; // the task id created by qnode
int32_t type;
int32_t level;
struct SQueryPhyNode *pNode;
SArray *pUpstream;
} SSubquery;
typedef struct SQuery {
SArray **pSubquery;
int32_t numOfLevels;
int32_t currentLevel;
} SQuery;
#ifdef __cplusplus
}
#endif
......
......@@ -11,4 +11,6 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
*/
#include "schedulerInt.h"
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册