提交 768e0593 编写于 作者: X Xiaoyu Wang

TD-13747 New SQL model integration

上级 42d91c3d
......@@ -23,6 +23,7 @@ extern "C" {
#include "os.h"
#include "thash.h"
#include "executor.h"
#include "plannodes.h"
#define DS_BUF_LOW 1
#define DS_BUF_FULL 2
......@@ -59,7 +60,7 @@ typedef struct SOutputData {
* @param pHandle output
* @return error code
*/
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle);
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
/**
* Put the result set returned by the executor into datasinker.
......
......@@ -70,6 +70,7 @@ typedef enum ENodeType {
QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT,
QUERY_NODE_SHOW_STMT,
QUERY_NODE_VNODE_MODIF_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN,
......@@ -80,9 +81,15 @@ typedef enum ENodeType {
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG
QUERY_NODE_PHYSICAL_PLAN_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_DSINK_DISPATCH
} ENodeType;
/**
......
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "querynodes.h"
#include "query.h"
#include "tmsg.h"
typedef struct SLogicNode {
......@@ -78,6 +79,8 @@ typedef struct SDataBlockDescNode {
ENodeType type;
int16_t dataBlockId;
SNodeList* pSlots;
int32_t resultRowSize;
int16_t precision;
} SDataBlockDescNode;
typedef struct SPhysiNode {
......@@ -129,6 +132,34 @@ typedef struct SAggPhysiNode {
SNodeList* pAggFuncs;
} SAggPhysiNode;
typedef struct SDownstreamSource {
SQueryNodeAddr addr;
uint64_t taskId;
uint64_t schedId;
} SDownstreamSource;
typedef struct SExchangePhysiNode {
SPhysiNode node;
uint64_t srcTemplateId; // template id of datasource suplans
SArray* pSrcEndPoints; // SArray<SDownstreamSource>, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
typedef struct SDataSinkNode {
ENodeType type;;
SDataBlockDescNode inputDataBlockDesc;
} SDataSinkNode;
typedef struct SDataDispatcherNode {
SDataSinkNode sink;
} SDataDispatcherNode;
typedef struct SDataInserterNode {
SDataSinkNode sink;
int32_t numOfTables;
uint32_t size;
char *pData;
} SDataInserterNode;
#ifdef __cplusplus
}
#endif
......
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "nodes.h"
#include "tmsg.h"
typedef struct SRawExprNode {
ENodeType nodeType;
......@@ -248,6 +249,28 @@ typedef enum ESqlClause {
SQL_CLAUSE_ORDER_BY
} ESqlClause;
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
typedef struct SVgDataBlocks {
SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SVnodeModifOpStmt {
ENodeType nodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SVnodeModifOpStmt;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_NEW_PARSER_H_
#define _TD_NEW_PARSER_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "parser.h"
typedef enum EStmtType {
STMT_TYPE_CMD = 1,
STMT_TYPE_QUERY
} EStmtType;
typedef struct SQuery {
EStmtType stmtType;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
} SQuery;
int32_t parser(SParseContext* pParseCxt, SQuery* pQuery);
#ifdef __cplusplus
}
#endif
#endif /*_TD_NEW_PARSER_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PARSENODES_H_
#define _TD_PARSENODES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "catalog.h"
#include "common.h"
#include "function.h"
#include "tmsgtype.h"
#include "tname.h"
#include "tvariant.h"
/**
* The first field of a node of any type is guaranteed to be the int16_t.
* Hence the type of any node can be gotten by casting it to SQueryNode.
*/
typedef struct SQueryNode {
int16_t type;
} SQueryNode;
#define queryNodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type)
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
SField *final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
int16_t relType; // relation between tbname list and query condition, including : TK_AND or TK_OR
SCond tbnameCond; // tbname query condition, only support tbname query condition on one table
SJoinInfo joinInfo; // join condition, only support two tables join currently
SArray *pCond; // for different table, the query condition must be seperated
} STagCond;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
int16_t type; // normal column/tag/ user input constant column
} SColumnIndex;
// select statement
typedef struct SQueryStmtInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SStateWindow stateWindow; // state window query
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray** exprList; // SArray<SExprInfo*>
SLimit limit;
SLimit slimit;
STagCond tagCond;
SArray * colCond;
SArray * order;
int16_t numOfTables;
int16_t curTableIdx;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int16_t fillType; // final result fill type
int64_t * fillVal; // default value for fill
int32_t numOfFillVal; // fill value size
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int32_t bufLen;
char* buf;
SArray *pUdfInfo;
struct SQueryStmtInfo *sibling; // sibling
SMultiFunctionsDesc info;
SArray *pDownstream; // SArray<struct SQueryStmtInfo>
int32_t havingFieldNum;
int32_t exprListLevelIndex;
} SQueryStmtInfo;
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
typedef struct SVgDataBlocks {
SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SVnodeModifOpStmtInfo {
int16_t nodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SVnodeModifOpStmtInfo;
typedef struct SDclStmtInfo {
int16_t nodeType;
int16_t msgType;
SEpSet epSet;
char* pMsg;
int32_t msgLen;
void* pExtension; // todo remove it soon
} SDclStmtInfo;
#ifdef __cplusplus
}
#endif
#endif /*_TD_PARSENODES_H_*/
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#if 0
#include "parsenodes.h"
typedef struct SParseContext {
......@@ -94,6 +96,37 @@ int32_t getNewResColId();
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
SExprInfo* createBinaryExprInfo(struct tExprNode* pNode, SSchema* pResSchema);
#else
#include "querynodes.h"
#include "tmsg.h"
typedef struct SParseContext {
uint64_t requestId;
int32_t acctId;
const char *db;
void *pTransporter;
SEpSet mgmtEpSet;
const char *pSql; // sql string
size_t sqlLen; // length of the sql string
char *pMsg; // extended error message if exists to help identifying the problem in sql statement.
int32_t msgLen; // max length of the msg
struct SCatalog *pCatalog;
} SParseContext;
typedef struct SQuery {
bool isCmd;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
void qDestroyQuery(SQuery* pQueryNode);
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#if 0
#include "query.h"
#include "tmsg.h"
#include "tarray.h"
......@@ -207,6 +209,65 @@ void qDestroyQueryDag(SQueryDag* pDag);
char* qDagToString(const SQueryDag* pDag);
SQueryDag* qStringToDag(const char* pStr);
#else
#include "plannodes.h"
#include "query.h"
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
#define QUERY_TYPE_MODIFY 4
typedef struct SSubplanId {
uint64_t queryId;
uint64_t templateId;
uint64_t subplanId;
} SSubplanId;
typedef struct SSubplan {
SSubplanId id; // unique id of the subplan
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SArray* pChildren; // the datasource subplan,from which to fetch the result
SArray* pParents; // the data destination subplan, get data from current subplan
SPhysiNode* pNode; // physical plan of current subplan
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
} SSubplan;
typedef struct SQueryPlan {
uint64_t queryId;
int32_t numOfSubplans;
SArray* pSubplans; // SArray*<SArray*<SSubplan*>>. The execution level of subplan, starting from 0.
} SQueryPlan;
typedef struct SPlanContext {
uint64_t queryId;
SNode* pAstRoot;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource);
// Convert to subplan to string for the scheduler to send to the executor
int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len);
int32_t qStringToSubplan(const char* str, SSubplan** subplan);
char* qQueryPlanToString(const SQueryPlan* pPlan);
SQueryPlan* qStringToQueryPlan(const char* pStr);
void qDestroyQueryPlan(SQueryPlan* pPlan);
#endif
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if defined(INCLUDE_AS_ENUM) // enum define mode
#undef OP_ENUM_MACRO
#define OP_ENUM_MACRO(op) OP_##op,
#elif defined(INCLUDE_AS_NAME) // comment define mode
#undef OP_ENUM_MACRO
#define OP_ENUM_MACRO(op) #op,
#else
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
#endif
OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan)
OP_ENUM_MACRO(SystemTableScan)
OP_ENUM_MACRO(StreamBlockScan)
OP_ENUM_MACRO(Aggregate)
OP_ENUM_MACRO(Project)
// OP_ENUM_MACRO(Groupby)
OP_ENUM_MACRO(Limit)
OP_ENUM_MACRO(SLimit)
OP_ENUM_MACRO(TimeWindow)
OP_ENUM_MACRO(SessionWindow)
OP_ENUM_MACRO(StateWindow)
OP_ENUM_MACRO(Fill)
OP_ENUM_MACRO(MultiTableAggregate)
OP_ENUM_MACRO(MultiTableTimeInterval)
OP_ENUM_MACRO(Filter)
OP_ENUM_MACRO(Distinct)
OP_ENUM_MACRO(Join)
OP_ENUM_MACRO(AllTimeWindow)
OP_ENUM_MACRO(AllMultiTableTimeInterval)
OP_ENUM_MACRO(Order)
OP_ENUM_MACRO(Exchange)
//OP_ENUM_MACRO(TableScan)
......@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes);
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
......@@ -80,7 +80,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob);
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pDag, const char* sql, struct SSchJob** pJob);
/**
* Fetch query result from the remote query executor
......@@ -112,7 +112,7 @@ void schedulerDestroy(void);
* @param pTasks SArray**<STaskInfo>
* @return
*/
int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks);
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks);
/**
* make one task info's multiple copies
......
......@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries(
taos
INTERFACE api
PRIVATE os util common transport parser planner catalog scheduler function qcom
PRIVATE os util common transport nodes parser planner catalog scheduler function qcom
)
if(${BUILD_TEST})
......
......@@ -172,7 +172,7 @@ typedef struct SRequestSendRecvBody {
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SDataBuf requestMsg;
struct SSchJob* pQueryJob; // query job, created according to sql query DAG.
struct SQueryDag* pDag; // the query dag, generated according to the sql statement.
struct SQueryPlan* pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo;
} SRequestSendRecvBody;
......@@ -232,7 +232,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery);
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery);
// --- heartbeat
// global, called by mgmt
......
......@@ -184,7 +184,7 @@ static void doDestroyRequest(void *p) {
tfree(pRequest->pInfo);
doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryDag(pRequest->body.pDag);
qDestroyQueryPlan(pRequest->body.pDag);
if (pRequest->body.showInfo.pArray != NULL) {
taosArrayDestroy(pRequest->body.showInfo.pArray);
......
......@@ -139,7 +139,7 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
return TSDB_CODE_SUCCESS;
}
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = {
......@@ -161,54 +161,57 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
}
code = qParseQuerySql(&cxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
}
tfree(cxt.db);
return code;
}
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
pRequest->type = pDcl->msgType;
pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen, .handle = NULL};
STscObj* pTscObj = pRequest->pTscObj;
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
if (pShowReqInfo->pArray == NULL) {
pShowReqInfo->currentIndex = 0; // set the first vnode/ then iterate the next vnode
pShowReqInfo->pArray = pDcl->pExtension;
}
}
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
} else {
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
}
tsem_wait(&pRequest->body.rspSem);
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
// SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
// pRequest->type = pDcl->msgType;
// pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen, .handle = NULL};
// STscObj* pTscObj = pRequest->pTscObj;
// SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
// int64_t transporterId = 0;
// if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
// if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
// SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
// if (pShowReqInfo->pArray == NULL) {
// pShowReqInfo->currentIndex = 0; // set the first vnode/ then iterate the next vnode
// pShowReqInfo->pArray = pDcl->pExtension;
// }
// }
// asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
// } else {
// asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
// }
// tsem_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS;
}
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, SArray* pNodeList) {
pRequest->type = pQueryNode->type;
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pDag, SArray* pNodeList) {
// pRequest->type = pQuery->type;
SSchema* pSchema = NULL;
int32_t numOfCols = 0;
int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId);
if (code != 0) {
return code;
}
// SSchema* pSchema = NULL;
// int32_t numOfCols = 0;
// int32_t code = qCreateQueryDag(pQuery, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId);
// if (code != 0) {
// return code;
// }
if (pQueryNode->type == TSDB_SQL_SELECT) {
setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
pRequest->type = TDMT_VND_QUERY;
}
// if (pQuery->type == TSDB_SQL_SELECT) {
// setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
// pRequest->type = TDMT_VND_QUERY;
// }
tfree(pSchema);
return code;
// tfree(pSchema);
// return code;
}
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
......@@ -224,7 +227,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
}
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
......@@ -254,24 +257,24 @@ TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
}
SRequestObj* pRequest = NULL;
SQueryNode* pQueryNode = NULL;
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
SQuery* pQuery;
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
if (qIsDdlQuery(pQueryNode)) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
if (pQuery->isCmd) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
} else {
CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return);
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
pRequest->code = terrno;
}
_return:
taosArrayDestroy(pNodeList);
qDestroyQuery(pQueryNode);
qDestroyQuery(pQuery);
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
pRequest->code = terrno;
}
......
......@@ -424,7 +424,7 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL;
SQueryNode* pQueryNode = NULL;
SQuery* pQueryNode = NULL;
char* pStr = NULL;
terrno = TSDB_CODE_SUCCESS;
......@@ -447,7 +447,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
}
tscDebug("start to create topic, %s", topicName);
#if 0
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
......@@ -503,7 +503,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
#endif
_return:
qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/
......
......@@ -760,9 +760,9 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SQueryPlan *pPlan = qStringToQueryPlan(pTopic->physicalPlan);
SArray *pArray = NULL;
SArray *inner = taosArrayGet(pDag->pSubplans, 0);
SArray *inner = taosArrayGet(pPlan->pSubplans, 0);
SSubplan *plan = taosArrayGetP(inner, 0);
SArray *unassignedVg = pSub->unassignedVg;
......@@ -779,7 +779,7 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
if (schedulerConvertDagToTaskList(pPlan, &pArray) < 0) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
return -1;
......
......@@ -22,6 +22,7 @@ extern "C" {
#include "common.h"
#include "dataSinkMgt.h"
#include "plannodes.h"
struct SDataSink;
struct SDataSinkHandle;
......@@ -45,7 +46,7 @@ typedef struct SDataSinkHandle {
FDestroyDataSinker fDestroy;
} SDataSinkHandle;
int32_t createDataDispatcher(SDataSinkManager* pManager, const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
#ifdef __cplusplus
}
......
......@@ -39,7 +39,7 @@ typedef struct SDataCacheEntry {
typedef struct SDataDispatchHandle {
SDataSinkHandle sink;
SDataSinkManager* pManager;
SDataBlockSchema schema;
SDataBlockDescNode schema;
STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput;
int32_t status;
......@@ -48,12 +48,13 @@ typedef struct SDataDispatchHandle {
pthread_mutex_t mutex;
} SDataDispatchHandle;
static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) {
static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSchema) {
if (tsCompressColData < 0 || 0 == pData->info.rows) {
return false;
}
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
int32_t colSize = pColRes->info.bytes * pData->info.rows;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
......@@ -70,13 +71,14 @@ static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t *compLen) {
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
int32_t *compSizes = (int32_t*)data;
if (compressed) {
data += pSchema->numOfCols * sizeof(int32_t);
data += numOfCols * sizeof(int32_t);
}
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
if (compressed) {
compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed);
......@@ -224,7 +226,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
pthread_mutex_destroy(&pDispatcher->mutex);
}
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataSink, DataSinkHandle* pHandle) {
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
......@@ -236,7 +238,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS
dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->pManager = pManager;
dispatcher->schema = pDataSink->schema;
dispatcher->schema = pDataSink->inputDataBlockDesc;
dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false;
dispatcher->pDataBlocks = taosOpenQueue();
......
......@@ -25,8 +25,8 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
pthread_mutex_init(&gDataSinkManager.mutex, NULL);
}
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) {
if (DSINK_Dispatch == pDataSink->info.type) {
int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) {
if (QUERY_NODE_DSINK_DISPATCH == nodeType(pDataSink)) {
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
}
return TSDB_CODE_FAILED;
......
......@@ -20,7 +20,7 @@
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) {
ASSERT(pOperator != NULL);
if (pOperator->operatorType != OP_StreamScan) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
......
......@@ -5285,7 +5285,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator";
pOperator->operatorType = OP_Exchange;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -5365,7 +5365,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pInfo->scanFlag = MAIN_SCAN;
pOperator->name = "TableScanOperator";
pOperator->operatorType = OP_TableScan;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -5389,7 +5389,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = OP_TableSeqScan;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -5452,7 +5452,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
pInfo->readerHandle = streamReadHandle;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = OP_StreamScan;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -6236,7 +6236,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
}
pOperator->name = "Order";
pOperator->operatorType = OP_Order;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -7193,6 +7193,21 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n
return TSDB_CODE_SUCCESS;
}
static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
assert(dst != NULL && src != NULL);
*dst = *src;
dst->pExpr = exprdup(src->pExpr);
dst->base.pColumns = calloc(src->base.numOfCols, sizeof(SColumn));
memcpy(dst->base.pColumns, src->base.pColumns, sizeof(SColumn) * src->base.numOfCols);
memset(dst->base.param, 0, sizeof(SVariant) * tListLen(dst->base.param));
for (int32_t j = 0; j < src->base.numOfParams; ++j) {
taosVariantAssign(&dst->base.param[j], &src->base.param[j]);
}
}
static SExprInfo* exprArrayDup(SArray* pExprInfo) {
size_t numOfOutput = taosArrayGetSize(pExprInfo);
SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo));
......@@ -7215,7 +7230,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate";
pOperator->operatorType = OP_Aggregate;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -7316,7 +7331,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiTableAggregate";
pOperator->operatorType = OP_MultiTableAggregate;
// pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -7690,7 +7705,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SLimitOperator";
pOperator->operatorType = OP_SLimit;
// pOperator->operatorType = OP_SLimit;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
// pOperator->exec = doSLimit;
......@@ -7846,7 +7861,7 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqTableTagScan";
pOperator->operatorType = OP_TagScan;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -8164,25 +8179,25 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
return pTaskInfo;
}
static tsdbReaderT doCreateDataReader(STableScanPhyNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId);
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId);
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) {
if (pPhyNode->info.type == OP_TableScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhyNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId);
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
} else if (pPhyNode->info.type == OP_Exchange) {
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
} else if (pPhyNode->info.type == OP_StreamScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table.
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
// SExchangePhysiNode* pEx = (SExchangePhysiNode*) pPhyNode;
// return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableGroupInfo groupInfo = {0};
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
......@@ -8198,58 +8213,58 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
taosArrayPush(idList, &pkeyInfo->uid);
}
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pPhyNode->pTargets, idList, pTaskInfo);
taosArrayDestroy(idList);
// SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pScanPhyNode->pScanCols, idList, pTaskInfo);
// taosArrayDestroy(idList);
//TODO destroy groupInfo
return pOperator;
// //TODO destroy groupInfo
// return pOperator;
}
}
if (pPhyNode->info.type == OP_Aggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren);
if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
// TODO single table agg
for (int32_t i = 0; i < size; ++i) {
SPhyNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
return createAggregateOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
// return createAggregateOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
} else if (pPhyNode->info.type == OP_MultiTableAggregate) {
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren);
assert(size == 1);
for (int32_t i = 0; i < size; ++i) {
SPhyNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i);
SPhysiNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
}
}*/
}
static tsdbReaderT createDataReaderImpl(STableScanPhyNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readHandle, uint64_t queryId, uint64_t taskId) {
static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readHandle, uint64_t queryId, uint64_t taskId) {
STsdbQueryCond cond = {.loadExternalRows = false};
cond.order = pTableScanNode->scan.order;
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
cond.numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
if (cond.colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
cond.twindow = pTableScanNode->window;
cond.twindow = pTableScanNode->scanRange;
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
for (int32_t i = 0; i < cond.numOfCols; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
assert(pExprInfo->pExpr->nodeType == TEXPR_COL_NODE);
// SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
// assert(pExprInfo->pExpr->nodeType == TEXPR_COL_NODE);
SSchema* pSchema = pExprInfo->pExpr->pSchema;
cond.colList[i].type = pSchema->type;
cond.colList[i].bytes = pSchema->bytes;
cond.colList[i].colId = pSchema->colId;
// SSchema* pSchema = pExprInfo->pExpr->pSchema;
// cond.colList[i].type = pSchema->type;
// cond.colList[i].bytes = pSchema->bytes;
// cond.colList[i].colId = pSchema->colId;
}
return tsdbQueryTables(readHandle, &cond, pGroupInfo, queryId, taskId);
......@@ -8266,7 +8281,7 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
return code;
}
static tsdbReaderT doCreateDataReader(STableScanPhyNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) {
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) {
STableGroupInfo groupInfo = {0};
uint64_t uid = pTableScanNode->scan.uid;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_ASTGENERATOR_H
#define TDENGINE_ASTGENERATOR_H
#ifdef __cplusplus
extern "C" {
#endif
#include "ttoken.h"
#include "tvariant.h"
#include "parser.h"
#define ParseTOKENTYPE SToken
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
enum SQL_NODE_TYPE {
SQL_NODE_TABLE_COLUMN= 1,
SQL_NODE_SQLFUNCTION = 2,
SQL_NODE_VALUE = 3,
SQL_NODE_EXPR = 4,
};
enum SQL_FROM_NODE_TYPE {
SQL_FROM_NODE_SUBQUERY = 1,
SQL_FROM_NODE_TABLES = 2,
};
enum SQL_UNION_TYPE {
SQL_TYPE_UNIONALL = 1,
SQL_TYPE_UNION = 2,
};
extern char tTokenTypeSwitcher[13];
#define toTSDBType(x) \
do { \
if ((x) >= tListLen(tTokenTypeSwitcher)) { \
(x) = TSDB_DATA_TYPE_BINARY; \
} else { \
(x) = tTokenTypeSwitcher[(x)]; \
} \
} while (0)
#define TPARSER_HAS_TOKEN(_t) ((_t).n > 0)
#define TPARSER_SET_NONE_TOKEN(_t) ((_t).n = 0)
typedef struct SListItem {
SVariant pVar;
uint8_t sortOrder;
} SListItem;
typedef struct SIntervalVal {
int32_t token;
SToken interval;
SToken offset;
} SIntervalVal;
typedef struct SSessionWindowVal {
SToken col;
SToken gap;
} SSessionWindowVal;
typedef struct SWindowStateVal {
SToken col;
} SWindowStateVal;
struct SRelationInfo;
typedef struct SSqlNode {
struct SRelationInfo *from; // from clause SArray<SSqlNode>
struct SArray *pSelNodeList; // select clause
struct tSqlExpr *pWhere; // where clause [optional]
SArray *pGroupby; // groupby clause, only for tags[optional], SArray<SListItem>
SArray *pSortOrder; // orderby [optional], SArray<SListItem>
SArray *fillType; // fill type[optional], SArray<SListItem>
SIntervalVal interval; // (interval, interval_offset) [optional]
SSessionWindowVal sessionVal; // session window [optional]
SWindowStateVal windowstateVal; // window_state(col) [optional]
SToken sliding; // sliding window [optional]
SLimit limit; // limit offset [optional]
SLimit slimit; // group limit offset [optional]
SToken sqlstr; // sql string in select clause
struct tSqlExpr *pHaving; // having clause [optional]
} SSqlNode;
typedef struct SSubclause {
int32_t unionType;
SArray *node;
} SSubclause;
typedef struct SRelElement {
union {
SToken tableName;
SSubclause *pSubquery;
};
SToken aliasName;
} SRelElement;
typedef struct SRelationInfo {
int32_t type; // nested query|table name list
SArray *list; // SArray<SRelElement>
} SRelationInfo;
typedef struct SCreatedTableInfo {
SToken name; // table name token
SToken stbName; // super table name token , for using clause
SArray *pTagNames; // create by using super table, tag name
SArray *pTagVals; // create by using super table, tag value. SArray<SToken>
char *fullname; // table full name
int8_t igExist; // ignore if exists
} SCreatedTableInfo;
typedef struct SCreateTableSql {
SToken name; // table name, create table [name] xxx
int8_t type; // create normal table/from super table/ stream
bool existCheck;
struct {
SArray *pTagColumns; // SArray<SField>
SArray *pColumns; // SArray<SField>
} colInfo;
SArray *childTableInfo; // SArray<SCreatedTableInfo>
SSqlNode *pSelect;
} SCreateTableSql;
typedef struct SAlterTableInfo {
SToken name;
int16_t tableType;
int16_t type;
STagData tagData;
SArray *pAddColumns; // SArray<SField>
SArray *varList; // set t=val or: change src dst, SArray<SListItem>
} SAlterTableInfo;
typedef struct SCreateDbInfo {
SToken dbname;
int32_t replica;
int32_t cacheBlockSize;
int32_t numOfVgroups;
int32_t numOfBlocks;
int32_t daysPerFile;
int32_t minRowsPerBlock;
int32_t maxRowsPerBlock;
int32_t fsyncPeriod;
int64_t commitTime;
int32_t walLevel;
int32_t quorum;
int32_t compressionLevel;
SToken precision;
bool ignoreExists;
int8_t update;
int8_t cachelast;
SArray *keep;
int8_t streamMode;
} SCreateDbInfo;
typedef struct SCreateFuncInfo {
SToken name;
SToken path;
int32_t type;
int32_t bufSize;
SField output;
} SCreateFuncInfo;
typedef struct SCreateAcctInfo {
int32_t maxUsers;
int32_t maxDbs;
int32_t maxTimeSeries;
int32_t maxStreams;
int32_t maxPointsPerSecond;
int64_t maxStorage;
int64_t maxQueryTime;
int32_t maxConnections;
SToken stat;
} SCreateAcctInfo;
typedef struct SShowInfo {
uint8_t showType;
SToken prefix;
SToken pattern;
} SShowInfo;
typedef struct SUserInfo {
SToken user;
SToken passwd;
SToken privilege;
int16_t type;
} SUserInfo;
typedef struct SMiscInfo {
SArray *a; // SArray<SToken>
bool existsCheck;
int16_t dbType;
int16_t tableType;
SUserInfo user;
union {
SCreateDbInfo dbOpt;
SCreateAcctInfo acctOpt;
SCreateFuncInfo funcOpt;
SShowInfo showOpt;
SToken id;
};
} SMiscInfo;
typedef struct SSqlInfo {
int32_t type;
bool valid;
SSubclause sub;
char msg[256];
SArray *funcs;
union {
SCreateTableSql *pCreateTableInfo;
SAlterTableInfo *pAlterInfo;
SMiscInfo *pMiscInfo;
};
} SSqlInfo;
typedef struct tSqlExpr {
uint16_t type; // sql node type
uint32_t tokenId; // TK_LE: less than(binary expr)
// The complete string of the function(col, param), and the function name is kept in exprToken
struct {
SToken operand;
struct SArray *paramList; // function parameters list
} Expr;
SToken columnName; // table column info
SVariant value; // the use input value
SToken exprToken; // original sql expr string or function name of sql function
struct tSqlExpr *pLeft; // the left child
struct tSqlExpr *pRight; // the right child
} tSqlExpr;
// used in select clause. select <SArray> from xxx
typedef struct tSqlExprItem {
tSqlExpr *pNode; // The list of expressions
int32_t functionId;
char *aliasName; // alias name, null-terminated string
bool distinct;
} tSqlExprItem;
SArray *tListItemAppend(SArray *pList, SVariant *pVar, uint8_t sortOrder);
SArray *tListItemInsert(SArray *pList, SVariant *pVar, uint8_t sortOrder, int32_t index);
SArray *tListItemAppendToken(SArray *pList, SToken *pAliasToken, uint8_t sortOrder);
SRelationInfo *setTableNameList(SRelationInfo *pRelationInfo, SToken *pName, SToken *pAlias);
void * destroyRelationInfo(SRelationInfo *pFromInfo);
SRelationInfo *addSubquery(SRelationInfo *pRelationInfo, SSubclause *pSub, SToken *pAlias);
// sql expr leaf node
tSqlExpr *tSqlExprCreateIdValue(SToken *pToken, int32_t optrType);
tSqlExpr *tSqlExprCreateFunction(SArray *pParam, SToken *pFuncToken, SToken *endToken, int32_t optType);
SArray * tRecordFuncName(SArray *pList, SToken *pToken);
tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType);
tSqlExpr *tSqlExprClone(tSqlExpr *pSrc);
void tSqlExprCompact(tSqlExpr **pExpr);
bool tSqlExprIsLeaf(tSqlExpr *pExpr);
bool tSqlExprIsParentOfLeaf(tSqlExpr *pExpr);
void tSqlExprDestroy(tSqlExpr *pExpr);
SArray * tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SToken *pDistinct, SToken *pToken);
void tSqlExprListDestroy(SArray *pList);
void tSqlExprEvaluate(tSqlExpr* pExpr);
SSqlNode *tSetQuerySqlNode(SToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps,
SWindowStateVal *pw, SToken *pSliding, SArray *pFill, SLimit *pLimit, SLimit *pgLimit, tSqlExpr *pHaving);
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right);
SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSelect, int32_t type);
SAlterTableInfo * tSetAlterTableInfo(SToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableType);
SCreatedTableInfo createNewChildTableInfo(SToken *pTableName, SArray *pTagNames, SArray *pTagVals, SToken *pToken,
SToken *igExists);
/*!
* test
* @param pSqlNode
*/
void destroyAllSqlNode(struct SSubclause *pSqlNode);
void destroySqlNode(SSqlNode *pSql);
void freeCreateTableInfo(void* p);
SSqlInfo *setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SToken *pTableName, int32_t type);
SSubclause* setSubclause(SSubclause* sub, void *pSqlNode);
SSubclause* appendSelectClause(SSubclause *sub, int32_t unionType, void *pSubclause);
void setCreatedTableName(SSqlInfo *pInfo, SToken *pTableNameToken, SToken *pIfNotExists);
void* destroyCreateTableSql(SCreateTableSql* pCreate);
void setDropFuncInfo(SSqlInfo *pInfo, int32_t type, SToken* pToken);
void setCreateFuncInfo(SSqlInfo *pInfo, int32_t type, SToken *pName, SToken *pPath, SField *output, SToken* bufSize, int32_t funcType);
void destroySqlInfo(SSqlInfo *pInfo);
void setDCLSqlElems(SSqlInfo *pInfo, int32_t type, int32_t nParams, ...);
void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SToken* pToken, SToken* existsCheck,int16_t dbType,int16_t tableType);
void setShowOptions(SSqlInfo *pInfo, int32_t type, SToken* prefix, SToken* pPatterns);
void setCreateDbInfo(SSqlInfo *pInfo, int32_t type, SToken *pToken, SCreateDbInfo *pDB, SToken *pIgExists);
void setCreateAcctSql(SSqlInfo *pInfo, int32_t type, SToken *pName, SToken *pPwd, SCreateAcctInfo *pAcctInfo);
void setCreateUserSql(SSqlInfo *pInfo, SToken *pName, SToken *pPasswd);
void setKillSql(SSqlInfo *pInfo, int32_t type, SToken *ip);
void setAlterUserSql(SSqlInfo *pInfo, int16_t type, SToken *pName, SToken* pPwd, SToken *pPrivilege);
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam);
void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo);
void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo);
// prefix show db.tables;
void tSetDbName(SToken *pCpxName, SToken *pDb);
void tSetColumnInfo(struct SField *pField, SToken *pName, struct SField *pType);
void tSetColumnType(struct SField *pField, SToken *type);
/**
* The main parse function.
* @param yyp The parser
* @param yymajor The major token code number
* @param yyminor The value for the token
*/
void Parse(void *yyp, int yymajor, ParseTOKENTYPE yyminor, SSqlInfo *);
/**
* Free the allocated resources in case of failure.
* @param p The parser to be deleted
* @param freeProc Function used to reclaim memory
*/
void ParseFree(void *p, void (*freeProc)(void *));
/**
* Allocated callback function.
* @param mallocProc The parser allocator
* @return
*/
void *ParseAlloc(void *(*mallocProc)(size_t));
/**
*
* @param str sql string
* @return sql ast
*/
SSqlInfo doGenerateAST(const char *str);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_ASTGENERATOR_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_ASTTOMSG_H
#define TDENGINE_ASTTOMSG_H
#include "parserInt.h"
#include "tmsg.h"
char* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
char* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
char* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
char* buildShowMsg(SShowInfo* pShowInfo, int32_t* outputLen, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
char* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, int32_t* outputLen, SParseContext* pCtx, SMsgBuf* pMsgBuf);
char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* outputLen, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
char* buildDropStableReq(SSqlInfo* pInfo, int32_t* outputLen, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
char* buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* outputLen, SMsgBuf* pMsgBuf);
char* buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* outputLen, SMsgBuf* pMsgBuf);
#endif // TDENGINE_ASTTOMSG_H
......@@ -22,7 +22,7 @@ extern "C" {
#include "parser.h"
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo);
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
#ifdef __cplusplus
}
......
......@@ -20,11 +20,11 @@
extern "C" {
#endif
#include "querynodes.h"
#include "newParser.h"
#include "parser.h"
int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery);
int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t parseQuerySql(SParseContext* pCxt, SQuery** pQuery);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PARSER_INT_H_
#define _TD_PARSER_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "catalog.h"
#include "tname.h"
#include "astGenerator.h"
struct SSqlNode;
typedef struct SInternalField {
TAOS_FIELD field;
bool visible;
SExprInfo *pExpr;
} SInternalField;
typedef struct SMsgBuf {
int32_t len;
char *buf;
} SMsgBuf;
void clearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
void clearAllTableMetaInfo(SQueryStmtInfo* pQueryInfo, bool removeMeta, uint64_t id);
/**
* Validate the sql info, according to the corresponding metadata info from catalog.
* @param pCtx
* @param pInfo
* @param pQueryInfo
* @param msgBuf
* @param msgBufLen
* @return
*/
int32_t qParserValidateSqlNode(SParseContext *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen);
/**
* validate the ddl ast, and convert the ast to the corresponding message format
* @param pSqlInfo
* @param output
* @param type
* @return
*/
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, char* msgBuf, int32_t msgBufLen);
/**
*
* @param pInfo
* @param pCtx
* @param msgBuf
* @param msgBufLen
* @return
*/
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, char* msgBuf, int32_t msgBufLen);
/**
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
* @param pNode
* @param tsPrecision
* @param msg
* @param msgBufLen
* @return
*/
int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf);
int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf);
SQueryStmtInfo* createQueryInfo();
void destroyQueryInfo(SQueryStmtInfo* pQueryInfo);
int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf);
/**
* Extract request meta info from the sql statement
* @param pSqlInfo
* @param pMetaInfo
* @param msg
* @param msgBufLen
* @return
*/
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, SParseContext *pCtx, char* msg, int32_t msgBufLen);
/**
* Destroy the meta data request structure.
* @param pMetaInfo
*/
void qParserCleanupMetaRequestInfo(SCatalogReq* pMetaInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_PARSER_INT_H_*/
\ No newline at end of file
......@@ -21,54 +21,19 @@ extern "C" {
#endif
#include "os.h"
#include "query.h"
#include "tmsg.h"
#include "ttoken.h"
#include "parserInt.h"
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
TAOS_FIELD createField(const SSchema* pSchema);
void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema);
SColumn createColumn(uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema);
SInternalField* insertFieldInfo(SFieldInfo* pFieldInfo, int32_t index, SSchema* field);
int32_t getNumOfFields(SFieldInfo* pFieldInfo);
SInternalField* getInternalField(SFieldInfo* pFieldInfo, int32_t index);
int32_t parserValidateIdToken(SToken* pToken);
int32_t parserValidatePassword(SToken* pToken, SMsgBuf* pMsgBuf);
int32_t parserValidateNameToken(SToken* pToken);
typedef struct SMsgBuf {
int32_t len;
char *buf;
} SMsgBuf;
int32_t buildInvalidOperationMsg(SMsgBuf* pMsgBuf, const char* msg);
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr);
STableMetaInfo* addEmptyMetaInfo(SQueryStmtInfo* pQueryInfo);
void columnListCopyAll(SArray* dst, const SArray* src);
SColumn* columnListInsert(SArray* pColumnList, uint64_t uid, SSchema* pSchema, int32_t flag);
SColumn* insertPrimaryTsColumn(SArray* pColumnList, const char* colName, uint64_t tableUid);
void cleanupTagCond(STagCond* pTagCond);
void cleanupColumnCond(SArray** pCond);
uint32_t convertRelationalOperator(SToken *pToken);
int32_t getExprFunctionId(SExprInfo *pExprInfo);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
bool isDclSqlStatement(SSqlInfo* pSqlInfo);
bool isDdlSqlStatement(SSqlInfo* pSqlInfo);
bool isDqlSqlStatement(SSqlInfo* pSqlInfo);
int32_t parserValidateIdToken(SToken* pToken);
typedef struct SKvParam {
SKVRowBuilder *builder;
......@@ -78,10 +43,12 @@ typedef struct SKvParam {
int32_t KvRowAppend(const void *value, int32_t len, void *param);
typedef int32_t (*_row_append_fn_t)(const void *value, int32_t len, void *param);
int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf);
int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
SSchema *getTableColumnSchema(const STableMeta *pTableMeta);
SSchema *getTableTagSchema(const STableMeta* pTableMeta);
int32_t getNumOfColumns(const STableMeta* pTableMeta);
int32_t getNumOfTags(const STableMeta* pTableMeta);
STableComInfo getTableInfo(const STableMeta* pTableMeta);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERYINFOUTIL_H
#define TDENGINE_QUERYINFOUTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "parserInt.h"
SSchema* getTbnameColumnSchema();
int32_t getNumOfColumns(const STableMeta* pTableMeta);
int32_t getNumOfTags(const STableMeta* pTableMeta);
SSchema *getTableColumnSchema(const STableMeta *pTableMeta);
SSchema *getTableTagSchema(const STableMeta* pTableMeta);
SArray *getCurrentExprList(SQueryStmtInfo* pQueryInfo);
size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo);
void addExprInfo(SArray* pExprList, int32_t index, SExprInfo* pExprInfo, int32_t level);
void updateExprInfo(SExprInfo* pExprInfo, int16_t functionId, int32_t colId, int16_t srcColumnIndex, int16_t resType, int16_t resSize);
SExprInfo* getExprInfo(SQueryStmtInfo* pQueryInfo, int32_t index);
void addExprInfoParam(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
void cleanupFieldInfo(SFieldInfo* pFieldInfo);
STableComInfo getTableInfo(const STableMeta* pTableMeta);
SArray *extractFunctionList(SArray* pExprInfoList);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QUERYINFOUTIL_H
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "astGenerator.h"
#include "parserInt.h"
#include "parserUtil.h"
char* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen) {
SCreateUserReq createReq = {0};
SUserInfo* pUser = &pInfo->pMiscInfo->user;
strncpy(createReq.user, pUser->user.z, pUser->user.n);
createReq.createType = pUser->type;
createReq.superUser = (int8_t)pUser->type;
if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
// pMsg->privilege = (char)pCmd->count;
} else {
strncpy(createReq.pass, pUser->passwd.z, pUser->passwd.n);
}
int32_t tlen = tSerializeSCreateUserReq(NULL, 0, &createReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSCreateUserReq(pReq, tlen, &createReq);
*outputLen = tlen;
return pReq;
}
char* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen) {
SCreateAcctReq createReq = {0};
SToken* pName = &pInfo->pMiscInfo->user.user;
SToken* pPwd = &pInfo->pMiscInfo->user.passwd;
strncpy(createReq.user, pName->z, pName->n);
strncpy(createReq.pass, pPwd->z, pPwd->n);
SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt;
createReq.maxUsers = pAcctOpt->maxUsers;
createReq.maxDbs = pAcctOpt->maxDbs;
createReq.maxTimeSeries = pAcctOpt->maxTimeSeries;
createReq.maxStreams = pAcctOpt->maxStreams;
createReq.maxStorage = pAcctOpt->maxStorage;
if (pAcctOpt->stat.n == 0) {
createReq.accessState = -1;
} else {
if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
createReq.accessState = TSDB_VN_READ_ACCCESS;
} else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
createReq.accessState = TSDB_VN_WRITE_ACCCESS;
} else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
createReq.accessState = TSDB_VN_ALL_ACCCESS;
} else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
createReq.accessState = 0;
}
}
int32_t tlen = tSerializeSCreateAcctReq(NULL, 0, &createReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSCreateAcctReq(pReq, tlen, &createReq);
*outputLen = tlen;
return pReq;
}
char* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgBufLen) {
SDropUserReq dropReq = {0};
SToken* pName = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (pName->n >= TSDB_USER_LEN) {
return NULL;
}
strncpy(dropReq.user, pName->z, pName->n);
int32_t tlen = tSerializeSDropUserReq(NULL, 0, &dropReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSDropUserReq(pReq, tlen, &dropReq);
*outputLen = tlen;
return pReq;
}
char* buildShowMsg(SShowInfo* pShowInfo, int32_t* outputLen, SParseContext* pCtx, SMsgBuf* pMsgBuf) {
SShowReq showReq = {.type = pShowInfo->showType};
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
SToken* pPattern = &pShowInfo->pattern;
if (pPattern->type > 0) { // only show tables support wildcard query
showReq.payloadLen = pPattern->n;
showReq.payload = malloc(showReq.payloadLen);
strncpy(showReq.payload, pPattern->z, pPattern->n);
}
} else {
SToken* pEpAddr = &pShowInfo->prefix;
assert(pEpAddr->n > 0 && pEpAddr->type > 0);
showReq.payloadLen = pEpAddr->n;
showReq.payload = malloc(showReq.payloadLen);
strncpy(showReq.payload, pEpAddr->z, pEpAddr->n);
}
if (pShowInfo->showType == TSDB_MGMT_TABLE_STB || pShowInfo->showType == TSDB_MGMT_TABLE_VGROUP) {
SName n = {0};
if (pShowInfo->prefix.n > 0) {
if (pShowInfo->prefix.n >= TSDB_DB_FNAME_LEN) {
terrno = buildInvalidOperationMsg(pMsgBuf, "prefix name is too long");
tFreeSShowReq(&showReq);
return NULL;
}
tNameSetDbName(&n, pCtx->acctId, pShowInfo->prefix.z, pShowInfo->prefix.n);
} else if (pCtx->db == NULL || strlen(pCtx->db) == 0) {
terrno = buildInvalidOperationMsg(pMsgBuf, "database is not specified");
tFreeSShowReq(&showReq);
return NULL;
} else {
tNameSetDbName(&n, pCtx->acctId, pCtx->db, strlen(pCtx->db));
}
tNameGetFullDbName(&n, showReq.db);
}
int32_t tlen = tSerializeSShowReq(NULL, 0, &showReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSShowReq(pReq, tlen, &showReq);
tFreeSShowReq(&showReq);
*outputLen = tlen;
return pReq;
}
static int32_t setKeepOption(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid number of keep options";
const char* msg2 = "invalid keep value";
const char* msg3 = "invalid keep value, should be keep0 <= keep1 <= keep2";
pMsg->daysToKeep0 = -1;
pMsg->daysToKeep1 = -1;
pMsg->daysToKeep2 = -1;
SArray* pKeep = pCreateDb->keep;
if (pKeep != NULL) {
size_t s = taosArrayGetSize(pKeep);
#ifdef _STORAGE
if (s >= 4 || s <= 0) {
#else
if (s != 1) {
#endif
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
// tListI* p0 = taosArrayGet(pKeep, 0);
// tVariantListItem* p1 = (s > 1) ? taosArrayGet(pKeep, 1) : p0;
// tVariantListItem* p2 = (s > 2) ? taosArrayGet(pKeep, 2) : p1;
//
// if ((int32_t)p0->pVar.i64 <= 0 || (int32_t)p1->pVar.i64 <= 0 || (int32_t)p2->pVar.i64 <= 0) {
// return buildInvalidOperationMsg(pMsgBuf, msg2);
// }
// if (!(((int32_t)p0->pVar.i64 <= (int32_t)p1->pVar.i64) && ((int32_t)p1->pVar.i64 <= (int32_t)p2->pVar.i64))) {
// return buildInvalidOperationMsg(pMsgBuf, msg3);
// }
//
// pMsg->daysToKeep0 = htonl((int32_t)p0->pVar.i64);
// pMsg->daysToKeep1 = htonl((int32_t)p1->pVar.i64);
// pMsg->daysToKeep2 = htonl((int32_t)p2->pVar.i64);
}
return TSDB_CODE_SUCCESS;
}
static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDbInfo, SMsgBuf* pMsgBuf) {
const char* msg = "invalid time precision";
pMsg->precision = TSDB_TIME_PRECISION_MILLI; // millisecond by default
SToken* pToken = (SToken*)&pCreateDbInfo->precision;
if (pToken->n > 0) {
pToken->n = strdequote(pToken->z);
if (strncmp(pToken->z, TSDB_TIME_PRECISION_MILLI_STR, pToken->n) == 0 &&
strlen(TSDB_TIME_PRECISION_MILLI_STR) == pToken->n) {
// time precision for this db: million second
pMsg->precision = TSDB_TIME_PRECISION_MILLI;
} else if (strncmp(pToken->z, TSDB_TIME_PRECISION_MICRO_STR, pToken->n) == 0 &&
strlen(TSDB_TIME_PRECISION_MICRO_STR) == pToken->n) {
pMsg->precision = TSDB_TIME_PRECISION_MICRO;
} else if (strncmp(pToken->z, TSDB_TIME_PRECISION_NANO_STR, pToken->n) == 0 &&
strlen(TSDB_TIME_PRECISION_NANO_STR) == pToken->n) {
pMsg->precision = TSDB_TIME_PRECISION_NANO;
} else {
return buildInvalidOperationMsg(pMsgBuf, msg);
}
}
return TSDB_CODE_SUCCESS;
}
static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->cacheBlockSize = pCreateDb->cacheBlockSize;
pMsg->totalBlocks = pCreateDb->numOfBlocks;
pMsg->daysPerFile = pCreateDb->daysPerFile;
pMsg->commitTime = (int32_t)pCreateDb->commitTime;
pMsg->minRows = pCreateDb->minRowsPerBlock;
pMsg->maxRows = pCreateDb->maxRowsPerBlock;
pMsg->fsyncPeriod = pCreateDb->fsyncPeriod;
pMsg->compression = (int8_t)pCreateDb->compressionLevel;
pMsg->walLevel = (char)pCreateDb->walLevel;
pMsg->replications = pCreateDb->replica;
pMsg->quorum = pCreateDb->quorum;
pMsg->ignoreExist = pCreateDb->ignoreExists;
pMsg->update = pCreateDb->update;
pMsg->cacheLastRow = pCreateDb->cachelast;
pMsg->numOfVgroups = pCreateDb->numOfVgroups;
pMsg->streamMode = pCreateDb->streamMode;
}
int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
doSetDbOptions(pCreateDbMsg, pCreateDbSql);
if (setKeepOption(pCreateDbMsg, pCreateDbSql, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (setTimePrecision(pCreateDbMsg, pCreateDbSql, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
return TSDB_CODE_SUCCESS;
}
// can only perform the parameters based on the macro definitation
static int32_t doCheckDbOptions(SCreateDbReq* pCreate, SMsgBuf* pMsgBuf) {
char msg[512] = {0};
if (pCreate->walLevel != -1 && (pCreate->walLevel < TSDB_MIN_WAL_LEVEL || pCreate->walLevel > TSDB_MAX_WAL_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option walLevel: %d, only 1-2 allowed", pCreate->walLevel);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->replications != -1 &&
(pCreate->replications < TSDB_MIN_DB_REPLICA_OPTION || pCreate->replications > TSDB_MAX_DB_REPLICA_OPTION)) {
snprintf(msg, tListLen(msg), "invalid db option replications: %d valid range: [%d, %d]", pCreate->replications,
TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
int32_t blocks = pCreate->totalBlocks;
if (blocks != -1 && (blocks < TSDB_MIN_TOTAL_BLOCKS || blocks > TSDB_MAX_TOTAL_BLOCKS)) {
snprintf(msg, tListLen(msg), "invalid db option totalBlocks: %d valid range: [%d, %d]", blocks,
TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->quorum != -1 &&
(pCreate->quorum < TSDB_MIN_DB_QUORUM_OPTION || pCreate->quorum > TSDB_MAX_DB_QUORUM_OPTION)) {
snprintf(msg, tListLen(msg), "invalid db option quorum: %d valid range: [%d, %d]", pCreate->quorum,
TSDB_MIN_DB_QUORUM_OPTION, TSDB_MAX_DB_QUORUM_OPTION);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
int32_t val = pCreate->daysPerFile;
if (val != -1 && (val < TSDB_MIN_DAYS_PER_FILE || val > TSDB_MAX_DAYS_PER_FILE)) {
snprintf(msg, tListLen(msg), "invalid db option daysPerFile: %d valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE,
TSDB_MAX_DAYS_PER_FILE);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = pCreate->cacheBlockSize;
if (val != -1 && (val < TSDB_MIN_CACHE_BLOCK_SIZE || val > TSDB_MAX_CACHE_BLOCK_SIZE)) {
snprintf(msg, tListLen(msg), "invalid db option cacheBlockSize: %d valid range: [%d, %d]", val,
TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MAX_CACHE_BLOCK_SIZE);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->precision != TSDB_TIME_PRECISION_MILLI && pCreate->precision != TSDB_TIME_PRECISION_MICRO &&
pCreate->precision != TSDB_TIME_PRECISION_NANO) {
snprintf(msg, tListLen(msg), "invalid db option timePrecision: %d valid value: [%d, %d, %d]", pCreate->precision,
TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = pCreate->commitTime;
if (val != -1 && (val < TSDB_MIN_COMMIT_TIME || val > TSDB_MAX_COMMIT_TIME)) {
snprintf(msg, tListLen(msg), "invalid db option commitTime: %d valid range: [%d, %d]", val, TSDB_MIN_COMMIT_TIME,
TSDB_MAX_COMMIT_TIME);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = pCreate->fsyncPeriod;
if (val != -1 && (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD)) {
snprintf(msg, tListLen(msg), "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD,
TSDB_MAX_FSYNC_PERIOD);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->compression != -1 &&
(pCreate->compression < TSDB_MIN_COMP_LEVEL || pCreate->compression > TSDB_MAX_COMP_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option compression: %d valid range: [%d, %d]", pCreate->compression,
TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = pCreate->numOfVgroups;
if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) {
snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
}
val = pCreate->maxRows;
if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) {
snprintf(msg, tListLen(msg), "invalid number of max rows in file block for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK);
}
val = pCreate->minRows;
if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) {
snprintf(msg, tListLen(msg), "invalid number of min rows in file block for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
}
return TSDB_CODE_SUCCESS;
}
char* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, int32_t* outputLen, SParseContext* pCtx, SMsgBuf* pMsgBuf) {
SCreateDbReq createReq = {0};
if (setDbOptions(&createReq, pCreateDbInfo, pMsgBuf) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
return NULL;
}
SName name = {0};
int32_t ret = tNameSetDbName(&name, pCtx->acctId, pCreateDbInfo->dbname.z, pCreateDbInfo->dbname.n);
if (ret != TSDB_CODE_SUCCESS) {
terrno = ret;
return NULL;
}
tNameGetFullDbName(&name, createReq.db);
if (doCheckDbOptions(&createReq, pMsgBuf) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
return NULL;
}
int32_t tlen = tSerializeSCreateDbReq(NULL, 0, &createReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSCreateDbReq(pReq, tlen, &createReq);
*outputLen = tlen;
return pReq;
}
char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* outputLen, SParseContext* pParseCtx,
SMsgBuf* pMsgBuf) {
SMCreateStbReq createReq = {0};
createReq.igExists = pCreateTableSql->existCheck ? 1 : 0;
createReq.pColumns = pCreateTableSql->colInfo.pColumns;
createReq.pTags = pCreateTableSql->colInfo.pTagColumns;
createReq.numOfColumns = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pColumns);
createReq.numOfTags = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
SName n = {0};
if (createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf) != 0) {
return NULL;
}
if (tNameExtractFullName(&n, createReq.name) != 0) {
buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified");
return NULL;
}
int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSMCreateStbReq(pReq, tlen, &createReq);
*outputLen = tlen;
return pReq;
}
char* buildDropStableReq(SSqlInfo* pInfo, int32_t* outputLen, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0);
SName name = {0};
int32_t code = createSName(&name, tableName, pParseCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
terrno = buildInvalidOperationMsg(pMsgBuf, "invalid table name");
return NULL;
}
SMDropStbReq dropReq = {0};
code = tNameExtractFullName(&name, dropReq.name);
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
dropReq.igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
int32_t tlen = tSerializeSMDropStbReq(NULL, 0, &dropReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSMDropStbReq(pReq, tlen, &dropReq);
*outputLen = tlen;
return pReq;
}
char* buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* outputLen, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid host name (name too long, maximum length 128)";
const char* msg2 = "dnode name can not be string";
const char* msg3 = "port should be an integer that is less than 65535 and greater than 0";
const char* msg4 = "failed prepare create dnode message";
if (taosArrayGetSize(pInfo->pMiscInfo->a) != 2) {
buildInvalidOperationMsg(pMsgBuf, msg1);
return NULL;
}
SToken* id = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (id->type != TK_ID && id->type != TK_IPTOKEN) {
buildInvalidOperationMsg(pMsgBuf, msg2);
return NULL;
}
SToken* port = taosArrayGet(pInfo->pMiscInfo->a, 1);
if (port->type != TK_INTEGER) {
buildInvalidOperationMsg(pMsgBuf, msg3);
return NULL;
}
bool isSign = false;
int64_t val = 0;
toInteger(port->z, port->n, 10, &val, &isSign);
if (val >= UINT16_MAX || val <= 0) {
buildInvalidOperationMsg(pMsgBuf, msg3);
return NULL;
}
SCreateDnodeReq createReq = {0};
strncpy(createReq.fqdn, id->z, id->n);
createReq.port = val;
int32_t tlen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSCreateDnodeReq(pReq, tlen, &createReq);
*outputLen = tlen;
return pReq;
}
char* buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* outputLen, SMsgBuf* pMsgBuf) {
SDropDnodeReq dropReq = {0};
SToken* pzName = taosArrayGet(pInfo->pMiscInfo->a, 0);
char* end = NULL;
dropReq.dnodeId = strtoll(pzName->z, &end, 10);
if (end - pzName->z != pzName->n) {
buildInvalidOperationMsg(pMsgBuf, "invalid dnode id");
return NULL;
}
int32_t tlen = tSerializeSDropDnodeReq(NULL, 0, &dropReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSDropDnodeReq(pReq, tlen, &dropReq);
*outputLen = tlen;
return pReq;
}
\ No newline at end of file
此差异已折叠。
此差异已折叠。
......@@ -17,8 +17,7 @@
#include "catalog.h"
#include "parserUtil.h"
#include "queryInfoUtil.h"
#include "tmsg.h"
#include "querynodes.h"
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
......
......@@ -16,9 +16,7 @@
#include "insertParser.h"
#include "dataBlockMgt.h"
#include "parserInt.h"
#include "parserUtil.h"
#include "queryInfoUtil.h"
#include "tglobal.h"
#include "ttime.h"
#include "ttoken.h"
......@@ -61,9 +59,14 @@ typedef struct SInsertParseContext {
SArray* pTableDataBlocks; // global
SArray* pVgDataBlocks; // global
int32_t totalNum;
SVnodeModifOpStmtInfo* pOutput;
SVnodeModifOpStmt* pOutput;
} SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(const void *value, int32_t len, void *param);
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
SToken sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
......@@ -95,6 +98,66 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD
return TSDB_CODE_SUCCESS;
}
static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
const char* msg1 = "name too long";
const char* msg2 = "invalid database name";
const char* msg3 = "db is not specified";
int32_t code = TSDB_CODE_SUCCESS;
char* p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
if (p != NULL) { // db has been specified in sql string so we ignore current db path
assert(*p == TS_PATH_DELIMITER[0]);
int32_t dbLen = p - pTableName->z;
char name[TSDB_DB_FNAME_LEN] = {0};
strncpy(name, pTableName->z, dbLen);
dbLen = strdequote(name);
code = tNameSetDbName(pName, pParseCtx->acctId, name, dbLen);
if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
int32_t tbLen = pTableName->n - dbLen - 1;
char tbname[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(tbname, p + 1, tbLen);
/*tbLen = */strdequote(tbname);
code = tNameFromString(pName, tbname, T_NAME_TABLE);
if (code != 0) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
} else { // get current DB name first, and then set it into path
if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
assert(pTableName->n < TSDB_TABLE_FNAME_LEN);
char name[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(name, pTableName->z, pTableName->n);
strdequote(name);
if (pParseCtx->db == NULL) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
code = tNameSetDbName(pName, pParseCtx->acctId, pParseCtx->db, strlen(pParseCtx->db));
if (code != TSDB_CODE_SUCCESS) {
code = buildInvalidOperationMsg(pMsgBuf, msg2);
return code;
}
code = tNameFromString(pName, name, T_NAME_TABLE);
if (code != 0) {
code = buildInvalidOperationMsg(pMsgBuf, msg1);
}
}
return code;
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
SName name = {0};
createSName(&name, pTname, pCxt->pComCxt, &pCxt->msg);
......@@ -258,6 +321,231 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
if ((pToken->type != TK_NOW && pToken->type != TK_INTEGER && pToken->type != TK_STRING && pToken->type != TK_FLOAT && pToken->type != TK_BOOL &&
pToken->type != TK_NULL && pToken->type != TK_HEX && pToken->type != TK_OCT && pToken->type != TK_BIN) ||
(pToken->n == 0) || (pToken->type == TK_RP)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
}
if (IS_NUMERIC_TYPE(type) && pToken->n == 0) {
return buildSyntaxErrMsg(pMsgBuf, "invalid numeric data", pToken->z);
}
// Remove quotation marks
if (TSDB_DATA_TYPE_BINARY == type) {
if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
}
// delete escape character: \\, \', \"
char delim = pToken->z[0];
int32_t cnt = 0;
int32_t j = 0;
for (uint32_t k = 1; k < pToken->n - 1; ++k) {
if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
tmpTokenBuf[j] = pToken->z[k + 1];
cnt++;
j++;
k++;
continue;
}
tmpTokenBuf[j] = pToken->z[k];
j++;
}
tmpTokenBuf[j] = 0;
pToken->z = tmpTokenBuf;
pToken->n -= 2 + cnt;
}
return TSDB_CODE_SUCCESS;
}
static bool isNullStr(SToken *pToken) {
return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) {
errno = 0;
*value = strtold(pToken->z, endPtr);
// not a valid integer number, return error
if ((*endPtr - pToken->z) != pToken->n) {
return TK_ILLEGAL;
}
return pToken->type;
}
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
int64_t iv;
char *endptr = NULL;
bool isSigned = false;
int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (isNullStr(pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
int64_t tmpVal = 0;
return func(&tmpVal, pSchema->bytes, param);
}
return func(getNullValue(pSchema->type), 0, param);
}
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
return func(&TRUE_VALUE, pSchema->bytes, param);
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
return func(&FALSE_VALUE, pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else if (pToken->type == TK_FLOAT) {
return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
}
case TSDB_DATA_TYPE_TINYINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UTINYINT:{
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_SMALLINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
}
int16_t tmpVal = (int16_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_USMALLINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
}
uint16_t tmpVal = (uint16_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_INT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
}
int32_t tmpVal = (int32_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
}
uint32_t tmpVal = (uint32_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BIGINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
}
return func(&iv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UBIGINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
}
uint64_t tmpVal = (uint64_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_FLOAT: {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
}
float tmpVal = (float)dv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_DOUBLE: {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
}
return func(&dv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BINARY: {
// Too long values will raise the invalid sql error message
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
}
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_NCHAR: {
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t tmpVal;
if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
}
return func(&tmpVal, pSchema->bytes, param);
}
}
return TSDB_CODE_FAILED;
}
typedef struct SMemParam {
SRowBuilder* rb;
SSchema* schema;
......@@ -634,7 +922,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
SInsertParseContext context = {
.pComCxt = pContext,
.pSql = (char*) pContext->pSql,
......@@ -643,7 +931,7 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
.totalNum = 0,
.pOutput = calloc(1, sizeof(SVnodeModifOpStmtInfo))
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT)
};
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
......@@ -651,8 +939,7 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
*pInfo = context.pOutput;
context.pOutput->nodeType = TSDB_SQL_INSERT;
(*pQuery)->pRoot = (SNode*)context.pOutput;
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = skipInsertInto(&context);
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include "astGenerator.h"
#include "parserInt.h"
#include "parserUtil.h"
......@@ -260,3 +261,36 @@ void qDestroyQuery(SQueryNode* pQueryNode) {
destroyQueryInfo(pQueryStmtInfo);
}
}
#else
#include "parser.h"
#include "insertParser.h"
#include "parserImpl.h"
#include "ttoken.h"
static bool isInsertSql(const char* pStr, size_t length) {
int32_t index = 0;
do {
SToken t0 = tStrGetToken((char*) pStr, &index, false);
if (t0.type != TK_LP) {
return t0.type == TK_INSERT || t0.type == TK_IMPORT;
}
} while (1);
}
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) {
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
return parseInsertSql(pCxt, pQuery);
} else {
return parseQuerySql(pCxt, pQuery);
}
}
void qDestroyQuery(SQuery* pQueryNode) {
// todo
}
#endif
......@@ -16,9 +16,11 @@
#include "parserImpl.h"
#include "astCreateContext.h"
#include "catalog.h"
#include "functionMgt.h"
#include "parserInt.h"
#include "parserUtil.h"
#include "tglobal.h"
#include "tname.h"
#include "ttime.h"
#include "ttoken.h"
......@@ -174,20 +176,20 @@ static uint32_t getToken(const char* z, uint32_t* tokenId) {
return n;
}
static EStmtType getStmtType(const SNode* pRootNode) {
static bool isCmd(const SNode* pRootNode) {
if (NULL == pRootNode) {
return STMT_TYPE_CMD;
return true;
}
switch (nodeType(pRootNode)) {
case QUERY_NODE_SELECT_STMT:
return STMT_TYPE_QUERY;
return false;
default:
break;
}
return STMT_TYPE_CMD;
return true;
}
int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery) {
int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery) {
SAstCreateContext cxt;
createAstCreateContext(pParseCxt, &cxt);
void *pParser = NewParseAlloc(malloc);
......@@ -236,8 +238,11 @@ int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery) {
abort_parse:
NewParseFree(pParser, free);
destroyAstCreateContext(&cxt);
pQuery->stmtType = getStmtType(cxt.pRootNode);
pQuery->pRoot = cxt.pRootNode;
if (cxt.valid) {
*pQuery = calloc(1, sizeof(SQuery));
(*pQuery)->isCmd = isCmd(cxt.pRootNode);
(*pQuery)->pRoot = cxt.pRootNode;
}
return cxt.valid ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
......@@ -1054,16 +1059,16 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(&cxt, pQuery->pRoot);
}
if (TSDB_CODE_SUCCESS == code && STMT_TYPE_QUERY == pQuery->stmtType) {
if (TSDB_CODE_SUCCESS == code && !pQuery->isCmd) {
code = setReslutSchema(&cxt, pQuery);
}
return code;
}
int32_t parser(SParseContext* pParseCxt, SQuery* pQuery) {
int32_t code = doParse(pParseCxt, pQuery);
int32_t parseQuerySql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = doParse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = doTranslate(pParseCxt, pQuery);
code = doTranslate(pCxt, *pQuery);
}
return code;
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -60,15 +60,12 @@ protected:
return code_;
}
SVnodeModifOpStmtInfo* reslut() {
return res_;
}
void dumpReslut() {
size_t num = taosArrayGetSize(res_->pDataBlocks);
cout << "schemaAttache:" << (int32_t)res_->schemaAttache << ", payloadType:" << (int32_t)res_->payloadType << ", insertType:" << res_->insertType << ", numOfVgs:" << num << endl;
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
cout << "schemaAttache:" << (int32_t)pStmt->schemaAttache << ", payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType << ", numOfVgs:" << num << endl;
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i);
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
SSubmitReq* submit = (SSubmitReq*)vg->pData;
cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl;
......@@ -84,13 +81,14 @@ protected:
}
void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) {
ASSERT_EQ(res_->schemaAttache, 0);
ASSERT_EQ(res_->payloadType, PAYLOAD_TYPE_KV);
ASSERT_EQ(res_->insertType, TSDB_QUERY_TYPE_INSERT);
size_t num = taosArrayGetSize(res_->pDataBlocks);
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
ASSERT_EQ(pStmt->schemaAttache, 0);
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
ASSERT_GE(num, 0);
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i);
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
ASSERT_EQ(vg->numOfTables, numOfTables);
ASSERT_GE(vg->size, 0);
SSubmitReq* submit = (SSubmitReq*)vg->pData;
......@@ -115,7 +113,10 @@ private:
cxt_.pMsg = errMagBuf_;
cxt_.msgLen = max_err_len;
code_ = TSDB_CODE_SUCCESS;
res_ = nullptr;
}
SVnodeModifOpStmt* getVnodeModifStmt(SQuery* pQuery) {
return (SVnodeModifOpStmt*)pQuery->pRoot;
}
string acctId_;
......@@ -124,7 +125,7 @@ private:
char sqlBuf_[max_sql_len];
SParseContext cxt_;
int32_t code_;
SVnodeModifOpStmtInfo* res_;
SQuery* res_;
};
// INSERT INTO tb_name VALUES (field1_value, ...)
......
......@@ -50,19 +50,19 @@ protected:
if (TSDB_CODE_SUCCESS != parseCode) {
return false;
}
code = doTranslate(&cxt_, &query_);
code = doTranslate(&cxt_, query_);
// cout << "doTranslate return " << code << endl;
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] code:" << code << ", " << translateCode << ", msg:" << errMagBuf_ << endl;
return (code == translateCode);
}
if (NULL != query_.pRoot && QUERY_NODE_SELECT_STMT == nodeType(query_.pRoot)) {
if (NULL != query_->pRoot && QUERY_NODE_SELECT_STMT == nodeType(query_->pRoot)) {
cout << "input sql : [" << cxt_.pSql << "]" << endl;
// string sql;
// selectToSql(query_.pRoot, sql);
// cout << "output sql : [" << sql << "]" << endl;
string str;
selectToStr(query_.pRoot, str);
selectToStr(query_->pRoot, str);
cout << "translate str : \n" << str << endl;
}
return (TSDB_CODE_SUCCESS == translateCode);
......@@ -507,7 +507,7 @@ private:
char errMagBuf_[max_err_len];
string sqlBuf_;
SParseContext cxt_;
SQuery query_;
SQuery* query_;
};
TEST_F(NewParserTest, selectSimple) {
......
此差异已折叠。
此差异已折叠。
#if 0
#include <gtest/gtest.h>
#include <iostream>
......@@ -723,4 +725,6 @@ TEST(testCase, extractMeta_test) {
destroySqlInfo(&info1);
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PLANNER_INT_H_
#define _TD_PLANNER_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "common.h"
#include "tarray.h"
#include "planner.h"
#include "parser.h"
#include "tmsg.h"
#define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2
#define QNODE_STREAMSCAN 3
#define QNODE_PROJECT 4
#define QNODE_AGGREGATE 5
#define QNODE_GROUPBY 6
#define QNODE_LIMIT 7
#define QNODE_JOIN 8
#define QNODE_DISTINCT 9
#define QNODE_SORT 10
#define QNODE_UNION 11
#define QNODE_TIMEWINDOW 12
#define QNODE_SESSIONWINDOW 13
#define QNODE_STATEWINDOW 14
#define QNODE_FILL 15
#define QNODE_MODIFY 16
typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not
int32_t phase; // merge|partial
int32_t type; // operator type
char *name; // operator name
SEpSet *sourceEp; // data source epset
} SQueryDistPlanNodeInfo;
typedef struct SQueryTableInfo {
char *tableName; // to be deleted
uint64_t uid; // to be deleted
STableMetaInfo *pMeta;
STimeWindow window;
} SQueryTableInfo;
typedef struct SQueryPlanNode {
SQueryNodeBasicInfo info;
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SArray *pExpr; // the query functions or sql aggregations
int32_t numOfExpr; // number of result columns, which is also the number of pExprs
void *pExtInfo; // additional information
// children operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
SArray *pChildren; // upstream nodes
struct SQueryPlanNode *pParent;
} SQueryPlanNode;
typedef struct SDataPayloadInfo {
int32_t msgType;
SArray *payload;
} SDataPayloadInfo;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
* @return
*/
int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode);
/**
* Create the query plan according to the bound AST, which is in the form of pQueryInfo
* @param pQueryInfo
* @param pQueryNode
* @return
*/
int32_t createQueryPlan(const SQueryNode* pNode, struct SQueryPlanNode** pQueryPlan);
/**
* Convert the query plan to string, in order to display it in the shell.
* @param pQueryNode
* @return
*/
int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
/**
* Restore the SQL statement according to the logic query plan.
* @param pQueryNode
* @param sql
* @return
*/
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId);
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource);
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
int32_t stringToSubplan(const char* str, SSubplan** subplan);
/**
* Destroy the query plan object.
* @return
*/
void destroyQueryPlan(struct SQueryPlanNode* pQueryNode);
/**
* Destroy the physical plan.
* @param pQueryPhyNode
* @return
*/
void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode);
const char* opTypeToOpName(int32_t type);
int32_t opNameToOpType(const char* name);
const char* dsinkTypeToDsinkName(int32_t type);
int32_t dsinkNameToDsinkType(const char* name);
#ifdef __cplusplus
}
#endif
#endif /*_TD_PLANNER_INT_H_*/
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
#include "tvariant.h"
#include "plannerUtil.h"
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册