diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 844757eeb5b4bacbeca03cf28eecc803f7924b90..a7d418d45e5dd2093188a5340ab241619623b3c8 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -84,8 +84,8 @@ typedef struct SProjectPhyNode { typedef struct SExchangePhyNode { SPhyNode node; - uint64_t templateId; - SArray *pSourceEpSet; // SEpSet + uint64_t srcTemplateId; // template id of datasource suplans + SArray *pSourceEpSet; // SEpSet, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhyNode; typedef struct SSubplanId { @@ -113,6 +113,8 @@ typedef struct SQueryDag { */ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag); +int32_t qSetSuplanExecutionNode(SArray* subplans, SArray* nodes); + int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); @@ -126,7 +128,7 @@ int32_t qSubPlanToString(struct SSubplan *pPhyNode, char** str); * @param pQueryPhyNode * @return */ -void* qDestroyQueryDag(struct SQueryDag* pDag); +void qDestroyQueryDag(struct SQueryDag* pDag); #ifdef __cplusplus } diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h index 27c7c534a25235f3f902adab57d08870fb3cb2cf..9f51969dc1d00028a3fcf6a508966d7e787bdcb5 100644 --- a/include/libs/planner/plannerOp.h +++ b/include/libs/planner/plannerOp.h @@ -27,7 +27,7 @@ OP_ENUM_MACRO(TableScan) OP_ENUM_MACRO(DataBlocksOptScan) OP_ENUM_MACRO(TableSeqScan) OP_ENUM_MACRO(TagScan) -OP_ENUM_MACRO(TableBlockInfoScan) +OP_ENUM_MACRO(SystemTableScan) OP_ENUM_MACRO(Aggregate) OP_ENUM_MACRO(Project) OP_ENUM_MACRO(Groupby) diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 6c65e4810df4dc177963a868c438a827aad80103..38b399fb0bdc388e18f103b385b453649f34eb0b 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -99,6 +99,8 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); */ int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); +int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); + /** * Convert to physical plan to string to enable to print it out in the shell. * @param pPhyNode @@ -111,7 +113,7 @@ int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str); * Destroy the query plan object. * @return */ -void* destroyQueryPlan(struct SQueryPlanNode* pQueryNode); +void destroyQueryPlan(struct SQueryPlanNode* pQueryNode); /** * Destroy the physical plan. diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c new file mode 100644 index 0000000000000000000000000000000000000000..ce923314bd9530b6425fff926cb3b9662f57b813 --- /dev/null +++ b/source/libs/planner/src/logicPlan.c @@ -0,0 +1,604 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "function.h" +#include "os.h" +#include "parser.h" +#include "plannerInt.h" + +typedef struct SFillEssInfo { + int32_t fillType; // fill type + int64_t *val; // fill value +} SFillEssInfo; + +typedef struct SJoinCond { + bool tagExists; // denote if tag condition exists or not + SColumn *tagCond[2]; + SColumn *colCond[2]; +} SJoinCond; + +static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo); +static void doDestroyQueryNode(SQueryPlanNode* pQueryNode); + +int32_t printExprInfo(char* buf, const SQueryPlanNode* pQueryNode, int32_t len); +int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { + return 0; +} + +int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) { + SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo); + assert(taosArrayGetSize(upstream) == 1); + + *pQueryNode = taosArrayGetP(upstream, 0); + + taosArrayDestroy(upstream); + return TSDB_CODE_SUCCESS; +} + +int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { + return 0; +} + +void destroyQueryPlan(SQueryPlanNode* pQueryNode) { + if (pQueryNode == NULL) { + return; + } + + doDestroyQueryNode(pQueryNode); +} + +//====================================================================================================================== + +static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, + SExprInfo** pExpr, int32_t numOfOutput, void* pExtInfo) { + SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode)); + + pNode->info.type = type; + pNode->info.name = strdup(name); + + pNode->numOfExpr = numOfOutput; + pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES); + + for(int32_t i = 0; i < numOfOutput; ++i) { + taosArrayPush(pNode->pExpr, &pExpr[i]); + } + + pNode->pChildren = taosArrayInit(4, POINTER_BYTES); + for(int32_t i = 0; i < numOfPrev; ++i) { + taosArrayPush(pNode->pChildren, &prev[i]); + } + + switch(type) { + case QNODE_TAGSCAN: + case QNODE_TABLESCAN: { + SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); + memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); + info->tableName = strdup(((SQueryTableInfo*) pExtInfo)->tableName); + pNode->pExtInfo = info; + break; + } + + case QNODE_TIMEWINDOW: { + SInterval* pInterval = calloc(1, sizeof(SInterval)); + pNode->pExtInfo = pInterval; + memcpy(pInterval, pExtInfo, sizeof(SInterval)); + break; + } + + case QNODE_STATEWINDOW: { + SColumn* psw = calloc(1, sizeof(SColumn)); + pNode->pExtInfo = psw; + memcpy(psw, pExtInfo, sizeof(SColumn)); + break; + } + + case QNODE_SESSIONWINDOW: { + SSessionWindow *pSessionWindow = calloc(1, sizeof(SSessionWindow)); + pNode->pExtInfo = pSessionWindow; + memcpy(pSessionWindow, pExtInfo, sizeof(struct SSessionWindow)); + break; + } + + case QNODE_GROUPBY: { + SGroupbyExpr* p = (SGroupbyExpr*) pExtInfo; + + SGroupbyExpr* pGroupbyExpr = calloc(1, sizeof(SGroupbyExpr)); + pGroupbyExpr->groupbyTag = p->groupbyTag; + pGroupbyExpr->columnInfo = taosArrayDup(p->columnInfo); + + pNode->pExtInfo = pGroupbyExpr; + break; + } + + case QNODE_FILL: { // todo !! + pNode->pExtInfo = pExtInfo; + break; + } + + case QNODE_LIMIT: { + pNode->pExtInfo = calloc(1, sizeof(SLimit)); + memcpy(pNode->pExtInfo, pExtInfo, sizeof(SLimit)); + break; + } + + case QNODE_SORT: { + pNode->pExtInfo = taosArrayDup(pExtInfo); + break; + } + + default: + break; + } + + return pNode; +} + +static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info, + SArray* pExprs, SArray* tableCols) { + if (pQueryInfo->info.onlyTagQuery) { + int32_t num = (int32_t) taosArrayGetSize(pExprs); + SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info); + + if (pQueryInfo->info.distinct) { + pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL); + } + + return pNode; + } + + SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); + + if (pQueryInfo->info.projectionQuery) { + int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs); + pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL); + } else { + STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0); + + // table source column projection, generate the projection expr + int32_t numOfCols = (int32_t) taosArrayGetSize(tableCols); + SExprInfo** pExpr = calloc(numOfCols, POINTER_BYTES); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumn* pCol = taosArrayGetP(tableCols, i); + + SSourceParam param = {0}; + addIntoSourceParam(¶m, NULL, pCol); + SSchema s = createSchema(pCol->info.type, pCol->info.bytes, pCol->info.colId, pCol->name); + SExprInfo* p = createExprInfo(pTableMetaInfo1, "project", ¶m, &s, 0); + pExpr[i] = p; + } + + pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL); + tfree(pExpr); + } + + return pNode; +} + +static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) { + // group by column not by tag + size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo); + + // check for aggregation + int32_t level = getExprFunctionLevel(pQueryInfo); + + for(int32_t i = level - 1; i >= 0; --i) { + SArray* p = pQueryInfo->exprList[i]; + size_t num = taosArrayGetSize(p); + + bool aggregateFunc = false; + for(int32_t j = 0; j < num; ++j) { + SExprInfo* pExpr = (SExprInfo*)taosArrayGetP(p, 0); + if (pExpr->pExpr->nodeType != TEXPR_FUNCTION_NODE) { + continue; + } + + aggregateFunc = qIsAggregateFunction(pExpr->pExpr->_function.functionName); + if (aggregateFunc) { + break; + } + } + + if (aggregateFunc) { + if (pQueryInfo->interval.interval > 0) { + pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->interval); + } else if (pQueryInfo->sessionWindow.gap > 0) { + pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->sessionWindow); + } else if (pQueryInfo->stateWindow.col.info.colId > 0) { + pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->stateWindow); + } else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) { + pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, &pQueryInfo->groupbyExpr); + } else { + pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL); + } + } else { + pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); + } + } + + if (pQueryInfo->havingFieldNum > 0) { +// int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1); +// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, NULL); + } + + if (pQueryInfo->fillType != TSDB_FILL_NONE) { + SFillEssInfo* pInfo = calloc(1, sizeof(SFillEssInfo)); + pInfo->fillType = pQueryInfo->fillType; + pInfo->val = calloc(pNode->numOfExpr, sizeof(int64_t)); + memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfExpr); + + SArray* p = pQueryInfo->exprList[0]; // top expression in select clause + pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, p, taosArrayGetSize(p), pInfo); + } + + if (pQueryInfo->order != NULL) { + SArray* pList = pQueryInfo->exprList[0]; + pNode = createQueryNode(QNODE_SORT, "Sort", &pNode, 1, pList->pData, taosArrayGetSize(pList), pQueryInfo->order); + } + + if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) { + pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, &pQueryInfo->limit); + } + + return pNode; +} + +static SQueryPlanNode* doCreateQueryPlanForSingleTable(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs, + SArray* tableCols) { + char name[TSDB_TABLE_FNAME_LEN] = {0}; + tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN); + + SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,}; + + // handle the only tag query + SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, pExprs, tableCols); + if (pQueryInfo->info.onlyTagQuery) { + tfree(info.tableName); + return pNode; + } + + SQueryPlanNode* pNode1 = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info); + tfree(info.tableName); + return pNode1; +} + +static bool isAllAggExpr(SArray* pList) { + assert(pList != NULL); + + for (int32_t k = 0; k < taosArrayGetSize(pList); ++k) { + SExprInfo* p = taosArrayGetP(pList, k); + if (p->pExpr->nodeType != TEXPR_FUNCTION_NODE || !qIsAggregateFunction(p->pExpr->_function.functionName)) { + return false; + } + } + + return true; +} + +SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) { + SArray* upstream = NULL; + + if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause + upstream = taosArrayInit(4, POINTER_BYTES); + + size_t size = taosArrayGetSize(pQueryInfo->pUpstream); + for(int32_t i = 0; i < size; ++i) { + SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i); + SArray* p = createQueryPlanImpl(pq); + taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p)); + } + } + + if (pQueryInfo->numOfTables > 1) { // it is a join query + // 1. separate the select clause according to table + taosArrayDestroy(upstream); + upstream = taosArrayInit(5, POINTER_BYTES); + + for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { + STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i]; + uint64_t uid = pTableMetaInfo->pTableMeta->uid; + + SArray* exprList = taosArrayInit(4, POINTER_BYTES); + if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; +// dropAllExprInfo(exprList); + exit(-1); + } + + // 2. create the query execution node + char name[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(&pTableMetaInfo->name, name); + SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,}; + + // 3. get the required table column list + SArray* tableColumnList = taosArrayInit(4, sizeof(SColumn)); + columnListCopy(tableColumnList, pQueryInfo->colList, uid); + + // 4. add the projection query node + SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList); + columnListDestroy(tableColumnList); +// dropAllExprInfo(exprList); + taosArrayPush(upstream, &pNode); + } + + // 3. add the join node here + SQueryTableInfo info = {0}; + int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]); + SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables, + pQueryInfo->exprList[0]->pData, num, NULL); + + // 4. add the aggregation or projection execution node + pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info); + upstream = taosArrayInit(5, POINTER_BYTES); + taosArrayPush(upstream, &pNode); + } else { // only one table, normal query process + STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; + SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList); + upstream = taosArrayInit(5, POINTER_BYTES); + taosArrayPush(upstream, &pNode); + } + + return upstream; +} + +static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { + tfree(pQueryNode->pExtInfo); + tfree(pQueryNode->pSchema); + tfree(pQueryNode->info.name); +// dropAllExprInfo(pQueryNode->pExpr); + + if (pQueryNode->pChildren != NULL) { + int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren); + for(int32_t i = 0; i < size; ++i) { + SQueryPlanNode* p = taosArrayGetP(pQueryNode->pChildren, i); + doDestroyQueryNode(p); + } + + taosArrayDestroy(pQueryNode->pChildren); + } + + tfree(pQueryNode); +} + +static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) { + if (level > 0) { + sprintf(buf + totalLen, "%*c", level, ' '); + totalLen += level; + } + + int32_t len1 = sprintf(buf + totalLen, "%s(", pQueryNode->info.name); + int32_t len = len1 + totalLen; + + switch(pQueryNode->info.type) { + case QNODE_TABLESCAN: { + SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo; + len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, pInfo->tableName, pInfo->uid, + pInfo->window.skey, pInfo->window.ekey); + assert(len1 > 0); + len += len1; + + for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { + SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i); + len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId); + + assert(len1 > 0); + len += len1; + } + + len1 = sprintf(buf + len, "\n"); + assert(len1 > 0); + + len += len1; + break; + } + + case QNODE_PROJECT: { + len1 = sprintf(buf + len, "cols:"); + assert(len1 > 0); + len += len1; + + len = printExprInfo(buf, pQueryNode, len); + len1 = sprintf(buf + len, ")"); + len += len1; + + // todo print filter info + len1 = sprintf(buf + len, " filters:(nil)\n"); + len += len1; + break; + } + + case QNODE_AGGREGATE: { + len = printExprInfo(buf, pQueryNode, len); + len1 = sprintf(buf + len, ")\n"); + len += len1; + + break; + } + + case QNODE_TIMEWINDOW: { + len = printExprInfo(buf, pQueryNode, len); + len1 = sprintf(buf + len, ") "); + len += len1; + + SInterval* pInterval = pQueryNode->pExtInfo; + + // todo dynamic return the time precision + len1 = sprintf(buf + len, "interval:%" PRId64 "(%s), sliding:%" PRId64 "(%s), offset:%" PRId64 "(%s)\n", + pInterval->interval, TSDB_TIME_PRECISION_MILLI_STR, pInterval->sliding, + TSDB_TIME_PRECISION_MILLI_STR, pInterval->offset, TSDB_TIME_PRECISION_MILLI_STR); + len += len1; + + break; + } + + case QNODE_STATEWINDOW: { + len = printExprInfo(buf, pQueryNode, len); + len1 = sprintf(buf + len, ") "); + len += len1; + + SColumn* pCol = pQueryNode->pExtInfo; + len1 = sprintf(buf + len, "col:%s #%d\n", pCol->name, pCol->info.colId); + len += len1; + break; + } + + case QNODE_SESSIONWINDOW: { + len = printExprInfo(buf, pQueryNode, len); + + len1 = sprintf(buf + len, ") "); + len += len1; + + struct SSessionWindow* ps = pQueryNode->pExtInfo; + len1 = sprintf(buf + len, "col:[%s #%d], gap:%" PRId64 " (ms) \n", ps->col.name, ps->col.info.colId, ps->gap); + len += len1; + break; + } + + case QNODE_GROUPBY: { + len = printExprInfo(buf, pQueryNode, len); + + SGroupbyExpr* pGroupbyExpr = pQueryNode->pExtInfo; + len1 = sprintf(buf + len, ") groupby_col: "); + len += len1; + + for (int32_t i = 0; i < taosArrayGetSize(pGroupbyExpr->columnInfo); ++i) { + SColumn* pCol = taosArrayGet(pGroupbyExpr->columnInfo, i); + len1 = sprintf(buf + len, "[%s #%d] ", pCol->name, pCol->info.colId); + len += len1; + } + + len += sprintf(buf + len, "\n"); + break; + } + + case QNODE_FILL: { + SFillEssInfo* pEssInfo = pQueryNode->pExtInfo; + len1 = sprintf(buf + len, "%d", pEssInfo->fillType); + len += len1; + + if (pEssInfo->fillType == TSDB_FILL_SET_VALUE) { + len1 = sprintf(buf + len, ", val:"); + len += len1; + + // todo get the correct fill data type + for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { + len1 = sprintf(buf + len, "%" PRId64, pEssInfo->val[i]); + len += len1; + + if (i < pQueryNode->numOfExpr - 1) { + len1 = sprintf(buf + len, ", "); + len += len1; + } + } + } + + len1 = sprintf(buf + len, ")\n"); + len += len1; + break; + } + + case QNODE_LIMIT: { + SLimit* pVal = pQueryNode->pExtInfo; + len1 = sprintf(buf + len, "limit: %" PRId64 ", offset: %" PRId64 ")\n", pVal->limit, pVal->offset); + len += len1; + break; + } + + case QNODE_DISTINCT: + case QNODE_TAGSCAN: { + len1 = sprintf(buf + len, "cols: "); + len += len1; + + len = printExprInfo(buf, pQueryNode, len); + + len1 = sprintf(buf + len, ")\n"); + len += len1; + + break; + } + + case QNODE_SORT: { + len1 = sprintf(buf + len, "cols:"); + len += len1; + + SArray* pSort = pQueryNode->pExtInfo; + for (int32_t i = 0; i < taosArrayGetSize(pSort); ++i) { + SOrder* p = taosArrayGet(pSort, i); + len1 = sprintf(buf + len, " [%s #%d %s]", p->col.name, p->col.info.colId, p->order == TSDB_ORDER_ASC? "ASC":"DESC"); + + len += len1; + } + + len1 = sprintf(buf + len, ")\n"); + len += len1; + break; + } + + case QNODE_JOIN: { + // print join condition + len1 = sprintf(buf + len, ")\n"); + len += len1; + break; + } + } + + return len; +} + +int32_t printExprInfo(char* buf, const SQueryPlanNode* pQueryNode, int32_t len) { + int32_t len1 = 0; + + for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { + SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i); + + SSqlExpr* pExpr = &pExprInfo->base; + len1 = sprintf(buf + len, "%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId); + assert(len1 > 0); + + len += len1; + if (i < pQueryNode->numOfExpr - 1) { + len1 = sprintf(buf + len, ", "); + len += len1; + } + } + + return len; +} + +int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) { + int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen); + + for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pChildren); ++i) { + SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pChildren, i); + int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len); + len = len1; + } + + return len; +} + +int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { + assert(pQueryNode); + + *str = calloc(1, 4096); + + int32_t len = sprintf(*str, "===== logic plan =====\n"); + queryPlanToStringImpl(*str, pQueryNode, 0, len); + + return TSDB_CODE_SUCCESS; +} + +SQueryPlanNode* queryPlanFromString() { + return NULL; +} diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index e7acb12bc0ac9441249eec5f553ca22f0338b2aa..277703f5c0b65618a122f566884394f976f62837 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -16,6 +16,9 @@ #include "plannerInt.h" #include "parser.h" +#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan +#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _ + static const char* gOpName[] = { "Unknown", #define INCLUDE_AS_NAME @@ -43,8 +46,56 @@ static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t si toDataBlockSchema(pPlanNode, &(node->targetSchema)); } +static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) { + SScanPhyNode* node = (SScanPhyNode*)initPhyNode(pPlanNode, type, size); + node->uid = pTable->pMeta->pTableMeta->uid; + node->tableType = pTable->pMeta->pTableMeta->tableType; + return (SPhyNode*)node; +} + +static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) { + return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode)); +} + static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) { - return initPhyNode(pPlanNode, OP_TagScan, sizeof(STagScanPhyNode)); + SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo; + return createPseudoScanNode(pPlanNode, pTable, OP_TagScan); +} + +static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { + // todo + return MASTER_SCAN; +} + +static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) { + STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pTable, op, sizeof(STableScanPhyNode)); + node->scanFlag = getScanFlag(pPlanNode, pTable); + node->window = pTable->window; + // todo tag cond + return (SPhyNode*)node; +} + +static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { + return createUserTableScanNode(pPlanNode, pTable, OP_TableScan); +} + +static bool isSystemTable(SQueryTableInfo* pTable) { + // todo + return false; +} + +static bool needSeqScan(SQueryPlanNode* pPlanNode) { + // todo + return false; +} + +static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { + if (isSystemTable(pTable)) { + return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan); + } else if (needSeqScan(pPlanNode)) { + return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan); + } + return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan); } static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { @@ -58,53 +109,61 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { if (NULL == pCxt->pCurrentSubplan->pChildern) { pCxt->pCurrentSubplan->pChildern = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); } - taosArrayPush(pCxt->pCurrentSubplan->pChildern, subplan); + taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan); subplan->pParents = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); - taosArrayPush(subplan->pParents, pCxt->pCurrentSubplan); + taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan); + } + SArray* currentLevel; + if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) { + currentLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + taosArrayPush(pCxt->pDag->pSubplans, ¤tLevel); + } else { + currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level); } + taosArrayPush(currentLevel, &subplan); pCxt->pCurrentSubplan = subplan; return subplan; } -static uint8_t getScanFlag(SQueryPlanNode* pPlanNode) { - // todo - return MASTER_SCAN; -} - -static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { - STableScanPhyNode* node = (STableScanPhyNode*)initPhyNode(pPlanNode, OP_TableScan, sizeof(STableScanPhyNode)); - node->scan.uid = pTable->pMeta->pTableMeta->uid; - node->scan.tableType = pTable->pMeta->pTableMeta->tableType; - node->scanFlag = getScanFlag(pPlanNode); - node->window = pTable->window; - // todo tag cond -} - static void vgroupToEpSet(const SVgroupMsg* vg, SEpSet* epSet) { - // todo + epSet->inUse = 0; // todo + epSet->numOfEps = vg->numOfEps; + for (int8_t i = 0; i < vg->numOfEps; ++i) { + epSet->port[i] = vg->epAddr[i].port; + strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn); + } + return; } -static void splitSubplanBySTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { +static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList; for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { + STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); vgroupToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet); - subplan->pNode = createTableScanNode(pCxt, pPlanNode, pTable); - // todo reset pCxt->pCurrentSubplan + subplan->pNode = createMultiTableScanNode(pPlanNode, pTable); + RECOVERY_CURRENT_SUBPLAN(pCxt); } + return pCxt->nextId.templateId++; } -static SPhyNode* createExchangeNode() { +static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) { + SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode)); + node->srcTemplateId = srcTemplateId; + return (SPhyNode*)node; +} +static bool needMultiNodeScan(SQueryTableInfo* pTable) { + // todo system table, for instance, user_tables + return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType); } -static SPhyNode* createScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { +static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo; - if (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType) { - splitSubplanBySTable(pCxt, pPlanNode, pTable); - return createExchangeNode(pCxt, pTable); + if (needMultiNodeScan(pTable)) { + return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable)); } - return createTableScanNode(pCxt, pPlanNode, pTable); + return createSingleTableScanNode(pPlanNode, pTable); } static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { @@ -114,7 +173,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { node = createTagScanNode(pPlanNode); break; case QNODE_TABLESCAN: - node = createScanNode(pCxt, pPlanNode); + node = createTableScanNode(pCxt, pPlanNode); break; default: assert(false); @@ -133,6 +192,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); + ++(pCxt->nextId.templateId); subplan->pNode = createPhyNode(pCxt, pRoot); SArray* l0 = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); taosArrayPush(l0, &subplan); @@ -144,7 +204,8 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD SPlanContext context = { .pCatalog = pCatalog, .pDag = calloc(1, sizeof(SQueryDag)), - .pCurrentSubplan = NULL + .pCurrentSubplan = NULL, + .nextId = {0} // todo queryid }; if (NULL == context.pDag) { return TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 19aac36e780dd2308fedd8907cc861f1e84be1d6..744a849e2d55e1293f35f2707acb000e3a8f4221 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -13,606 +13,31 @@ * along with this program. If not, see . */ -#include "function.h" -#include "os.h" #include "parser.h" #include "plannerInt.h" -typedef struct SFillEssInfo { - int32_t fillType; // fill type - int64_t *val; // fill value -} SFillEssInfo; - -typedef struct SJoinCond { - bool tagExists; // denote if tag condition exists or not - SColumn *tagCond[2]; - SColumn *colCond[2]; -} SJoinCond; - -static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo); -static void doDestroyQueryNode(SQueryPlanNode* pQueryNode); - -int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len); -int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { - return 0; -} - -int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) { - SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo); - assert(taosArrayGetSize(upstream) == 1); - - *pQueryNode = taosArrayGetP(upstream, 0); - - taosArrayDestroy(upstream); - return TSDB_CODE_SUCCESS; -} - -int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { - return 0; -} - -int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { - - return 0; -} - -int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str) { - return 0; -} - -void* destroyQueryPlan(SQueryPlanNode* pQueryNode) { - if (pQueryNode == NULL) { - return NULL; - } - - doDestroyQueryNode(pQueryNode); - return NULL; -} - -void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode) { - return NULL; -} - -//====================================================================================================================== - -static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, - SExprInfo** pExpr, int32_t numOfOutput, void* pExtInfo) { - SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode)); - - pNode->info.type = type; - pNode->info.name = strdup(name); - - pNode->numOfExpr = numOfOutput; - pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES); - - for(int32_t i = 0; i < numOfOutput; ++i) { - taosArrayPush(pNode->pExpr, &pExpr[i]); - } - - pNode->pChildren = taosArrayInit(4, POINTER_BYTES); - for(int32_t i = 0; i < numOfPrev; ++i) { - taosArrayPush(pNode->pChildren, &prev[i]); - } - - switch(type) { - case QNODE_TAGSCAN: - case QNODE_TABLESCAN: { - SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); - memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); - info->tableName = strdup(((SQueryTableInfo*) pExtInfo)->tableName); - pNode->pExtInfo = info; - break; - } - - case QNODE_TIMEWINDOW: { - SInterval* pInterval = calloc(1, sizeof(SInterval)); - pNode->pExtInfo = pInterval; - memcpy(pInterval, pExtInfo, sizeof(SInterval)); - break; - } - - case QNODE_STATEWINDOW: { - SColumn* psw = calloc(1, sizeof(SColumn)); - pNode->pExtInfo = psw; - memcpy(psw, pExtInfo, sizeof(SColumn)); - break; - } - - case QNODE_SESSIONWINDOW: { - SSessionWindow *pSessionWindow = calloc(1, sizeof(SSessionWindow)); - pNode->pExtInfo = pSessionWindow; - memcpy(pSessionWindow, pExtInfo, sizeof(struct SSessionWindow)); - break; - } - - case QNODE_GROUPBY: { - SGroupbyExpr* p = (SGroupbyExpr*) pExtInfo; - - SGroupbyExpr* pGroupbyExpr = calloc(1, sizeof(SGroupbyExpr)); - pGroupbyExpr->groupbyTag = p->groupbyTag; - pGroupbyExpr->columnInfo = taosArrayDup(p->columnInfo); - - pNode->pExtInfo = pGroupbyExpr; - break; - } - - case QNODE_FILL: { // todo !! - pNode->pExtInfo = pExtInfo; - break; - } - - case QNODE_LIMIT: { - pNode->pExtInfo = calloc(1, sizeof(SLimit)); - memcpy(pNode->pExtInfo, pExtInfo, sizeof(SLimit)); - break; - } - - case QNODE_SORT: { - pNode->pExtInfo = taosArrayDup(pExtInfo); - break; - } - - default: - break; - } - - return pNode; -} - -static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info, - SArray* pExprs, SArray* tableCols) { - if (pQueryInfo->info.onlyTagQuery) { - int32_t num = (int32_t) taosArrayGetSize(pExprs); - SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info); - - if (pQueryInfo->info.distinct) { - pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL); - } - - return pNode; - } - - SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); - - if (pQueryInfo->info.projectionQuery) { - int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs); - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL); - } else { - STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0); - - // table source column projection, generate the projection expr - int32_t numOfCols = (int32_t) taosArrayGetSize(tableCols); - SExprInfo** pExpr = calloc(numOfCols, POINTER_BYTES); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumn* pCol = taosArrayGetP(tableCols, i); - - SSourceParam param = {0}; - addIntoSourceParam(¶m, NULL, pCol); - SSchema s = createSchema(pCol->info.type, pCol->info.bytes, pCol->info.colId, pCol->name); - SExprInfo* p = createExprInfo(pTableMetaInfo1, "project", ¶m, &s, 0); - pExpr[i] = p; - } - - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL); - tfree(pExpr); - } - - return pNode; -} - -static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) { - // group by column not by tag - size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo); - - // check for aggregation - int32_t level = getExprFunctionLevel(pQueryInfo); - - for(int32_t i = level - 1; i >= 0; --i) { - SArray* p = pQueryInfo->exprList[i]; - size_t num = taosArrayGetSize(p); - - bool aggregateFunc = false; - for(int32_t j = 0; j < num; ++j) { - SExprInfo* pExpr = (SExprInfo*)taosArrayGetP(p, 0); - if (pExpr->pExpr->nodeType != TEXPR_FUNCTION_NODE) { - continue; - } - - aggregateFunc = qIsAggregateFunction(pExpr->pExpr->_function.functionName); - if (aggregateFunc) { - break; - } - } - - if (aggregateFunc) { - if (pQueryInfo->interval.interval > 0) { - pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->interval); - } else if (pQueryInfo->sessionWindow.gap > 0) { - pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->sessionWindow); - } else if (pQueryInfo->stateWindow.col.info.colId > 0) { - pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->stateWindow); - } else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) { - pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, &pQueryInfo->groupbyExpr); - } else { - pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL); - } - } else { - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); - } - } - - if (pQueryInfo->havingFieldNum > 0) { -// int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1); -// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, NULL); - } - - if (pQueryInfo->fillType != TSDB_FILL_NONE) { - SFillEssInfo* pInfo = calloc(1, sizeof(SFillEssInfo)); - pInfo->fillType = pQueryInfo->fillType; - pInfo->val = calloc(pNode->numOfExpr, sizeof(int64_t)); - memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfExpr); - - SArray* p = pQueryInfo->exprList[0]; // top expression in select clause - pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, p, taosArrayGetSize(p), pInfo); - } - - if (pQueryInfo->order != NULL) { - SArray* pList = pQueryInfo->exprList[0]; - pNode = createQueryNode(QNODE_SORT, "Sort", &pNode, 1, pList->pData, taosArrayGetSize(pList), pQueryInfo->order); - } - - if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) { - pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, &pQueryInfo->limit); - } - - return pNode; -} - -static SQueryPlanNode* doCreateQueryPlanForSingleTable(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs, - SArray* tableCols) { - char name[TSDB_TABLE_FNAME_LEN] = {0}; - tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN); - - SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,}; - - // handle the only tag query - SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, pExprs, tableCols); - if (pQueryInfo->info.onlyTagQuery) { - tfree(info.tableName); - return pNode; - } - - SQueryPlanNode* pNode1 = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info); - tfree(info.tableName); - return pNode1; -} - -static bool isAllAggExpr(SArray* pList) { - assert(pList != NULL); - - for (int32_t k = 0; k < taosArrayGetSize(pList); ++k) { - SExprInfo* p = taosArrayGetP(pList, k); - if (p->pExpr->nodeType != TEXPR_FUNCTION_NODE || !qIsAggregateFunction(p->pExpr->_function.functionName)) { - return false; - } - } - - return true; +void qDestroyQueryDag(struct SQueryDag* pDag) { + // todo } -SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) { - SArray* upstream = NULL; - - if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause - upstream = taosArrayInit(4, POINTER_BYTES); - - size_t size = taosArrayGetSize(pQueryInfo->pUpstream); - for(int32_t i = 0; i < size; ++i) { - SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i); - SArray* p = createQueryPlanImpl(pq); - taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p)); - } +int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag) { + SQueryPlanNode* logicPlan; + int32_t code = createQueryPlan(pQueryInfo, &logicPlan); + if (TSDB_CODE_SUCCESS != code) { + destroyQueryPlan(logicPlan); + return code; } - - if (pQueryInfo->numOfTables > 1) { // it is a join query - // 1. separate the select clause according to table - taosArrayDestroy(upstream); - upstream = taosArrayInit(5, POINTER_BYTES); - - for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i]; - uint64_t uid = pTableMetaInfo->pTableMeta->uid; - - SArray* exprList = taosArrayInit(4, POINTER_BYTES); - if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; -// dropAllExprInfo(exprList); - exit(-1); - } - - // 2. create the query execution node - char name[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, name); - SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,}; - - // 3. get the required table column list - SArray* tableColumnList = taosArrayInit(4, sizeof(SColumn)); - columnListCopy(tableColumnList, pQueryInfo->colList, uid); - - // 4. add the projection query node - SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList); - columnListDestroy(tableColumnList); -// dropAllExprInfo(exprList); - taosArrayPush(upstream, &pNode); - } - - // 3. add the join node here - SQueryTableInfo info = {0}; - int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]); - SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables, - pQueryInfo->exprList[0]->pData, num, NULL); - - // 4. add the aggregation or projection execution node - pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info); - upstream = taosArrayInit(5, POINTER_BYTES); - taosArrayPush(upstream, &pNode); - } else { // only one table, normal query process - STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; - SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList); - upstream = taosArrayInit(5, POINTER_BYTES); - taosArrayPush(upstream, &pNode); - } - - return upstream; -} - -static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { - tfree(pQueryNode->pExtInfo); - tfree(pQueryNode->pSchema); - tfree(pQueryNode->info.name); -// dropAllExprInfo(pQueryNode->pExpr); - - if (pQueryNode->pChildren != NULL) { - int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren); - for(int32_t i = 0; i < size; ++i) { - SQueryPlanNode* p = taosArrayGetP(pQueryNode->pChildren, i); - doDestroyQueryNode(p); - } - - taosArrayDestroy(pQueryNode->pChildren); + code = optimizeQueryPlan(logicPlan); + if (TSDB_CODE_SUCCESS != code) { + destroyQueryPlan(logicPlan); + return code; } - - tfree(pQueryNode); -} - -static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) { - if (level > 0) { - sprintf(buf + totalLen, "%*c", level, ' '); - totalLen += level; + code = createDag(logicPlan, NULL, pDag); + if (TSDB_CODE_SUCCESS != code) { + destroyQueryPlan(logicPlan); + qDestroyQueryDag(*pDag); + return code; } - - int32_t len1 = sprintf(buf + totalLen, "%s(", pQueryNode->info.name); - int32_t len = len1 + totalLen; - - switch(pQueryNode->info.type) { - case QNODE_TABLESCAN: { - SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo; - len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, pInfo->tableName, pInfo->uid, - pInfo->window.skey, pInfo->window.ekey); - assert(len1 > 0); - len += len1; - - for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { - SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i); - len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId); - - assert(len1 > 0); - len += len1; - } - - len1 = sprintf(buf + len, "\n"); - assert(len1 > 0); - - len += len1; - break; - } - - case QNODE_PROJECT: { - len1 = sprintf(buf + len, "cols:"); - assert(len1 > 0); - len += len1; - - len = printExprInfo(buf, pQueryNode, len); - len1 = sprintf(buf + len, ")"); - len += len1; - - // todo print filter info - len1 = sprintf(buf + len, " filters:(nil)\n"); - len += len1; - break; - } - - case QNODE_AGGREGATE: { - len = printExprInfo(buf, pQueryNode, len); - len1 = sprintf(buf + len, ")\n"); - len += len1; - - break; - } - - case QNODE_TIMEWINDOW: { - len = printExprInfo(buf, pQueryNode, len); - len1 = sprintf(buf + len, ") "); - len += len1; - - SInterval* pInterval = pQueryNode->pExtInfo; - - // todo dynamic return the time precision - len1 = sprintf(buf + len, "interval:%" PRId64 "(%s), sliding:%" PRId64 "(%s), offset:%" PRId64 "(%s)\n", - pInterval->interval, TSDB_TIME_PRECISION_MILLI_STR, pInterval->sliding, - TSDB_TIME_PRECISION_MILLI_STR, pInterval->offset, TSDB_TIME_PRECISION_MILLI_STR); - len += len1; - - break; - } - - case QNODE_STATEWINDOW: { - len = printExprInfo(buf, pQueryNode, len); - len1 = sprintf(buf + len, ") "); - len += len1; - - SColumn* pCol = pQueryNode->pExtInfo; - len1 = sprintf(buf + len, "col:%s #%d\n", pCol->name, pCol->info.colId); - len += len1; - break; - } - - case QNODE_SESSIONWINDOW: { - len = printExprInfo(buf, pQueryNode, len); - - len1 = sprintf(buf + len, ") "); - len += len1; - - struct SSessionWindow* ps = pQueryNode->pExtInfo; - len1 = sprintf(buf + len, "col:[%s #%d], gap:%" PRId64 " (ms) \n", ps->col.name, ps->col.info.colId, ps->gap); - len += len1; - break; - } - - case QNODE_GROUPBY: { - len = printExprInfo(buf, pQueryNode, len); - - SGroupbyExpr* pGroupbyExpr = pQueryNode->pExtInfo; - len1 = sprintf(buf + len, ") groupby_col: "); - len += len1; - - for (int32_t i = 0; i < taosArrayGetSize(pGroupbyExpr->columnInfo); ++i) { - SColumn* pCol = taosArrayGet(pGroupbyExpr->columnInfo, i); - len1 = sprintf(buf + len, "[%s #%d] ", pCol->name, pCol->info.colId); - len += len1; - } - - len += sprintf(buf + len, "\n"); - break; - } - - case QNODE_FILL: { - SFillEssInfo* pEssInfo = pQueryNode->pExtInfo; - len1 = sprintf(buf + len, "%d", pEssInfo->fillType); - len += len1; - - if (pEssInfo->fillType == TSDB_FILL_SET_VALUE) { - len1 = sprintf(buf + len, ", val:"); - len += len1; - - // todo get the correct fill data type - for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { - len1 = sprintf(buf + len, "%" PRId64, pEssInfo->val[i]); - len += len1; - - if (i < pQueryNode->numOfExpr - 1) { - len1 = sprintf(buf + len, ", "); - len += len1; - } - } - } - - len1 = sprintf(buf + len, ")\n"); - len += len1; - break; - } - - case QNODE_LIMIT: { - SLimit* pVal = pQueryNode->pExtInfo; - len1 = sprintf(buf + len, "limit: %" PRId64 ", offset: %" PRId64 ")\n", pVal->limit, pVal->offset); - len += len1; - break; - } - - case QNODE_DISTINCT: - case QNODE_TAGSCAN: { - len1 = sprintf(buf + len, "cols: "); - len += len1; - - len = printExprInfo(buf, pQueryNode, len); - - len1 = sprintf(buf + len, ")\n"); - len += len1; - - break; - } - - case QNODE_SORT: { - len1 = sprintf(buf + len, "cols:"); - len += len1; - - SArray* pSort = pQueryNode->pExtInfo; - for (int32_t i = 0; i < taosArrayGetSize(pSort); ++i) { - SOrder* p = taosArrayGet(pSort, i); - len1 = sprintf(buf + len, " [%s #%d %s]", p->col.name, p->col.info.colId, p->order == TSDB_ORDER_ASC? "ASC":"DESC"); - - len += len1; - } - - len1 = sprintf(buf + len, ")\n"); - len += len1; - break; - } - - case QNODE_JOIN: { - // print join condition - len1 = sprintf(buf + len, ")\n"); - len += len1; - break; - } - } - - return len; -} - -int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len) { - int32_t len1 = 0; - - for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { - SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i); - - SSqlExpr* pExpr = &pExprInfo->base; - len1 = sprintf(buf + len, "%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId); - assert(len1 > 0); - - len += len1; - if (i < pQueryNode->numOfExpr - 1) { - len1 = sprintf(buf + len, ", "); - len += len1; - } - } - - return len; -} - -int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) { - int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen); - - for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pChildren); ++i) { - SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pChildren, i); - int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len); - len = len1; - } - - return len; -} - -int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { - assert(pQueryNode); - - *str = calloc(1, 4096); - - int32_t len = sprintf(*str, "===== logic plan =====\n"); - queryPlanToStringImpl(*str, pQueryNode, 0, len); - + destroyQueryPlan(logicPlan); return TSDB_CODE_SUCCESS; } - -SQueryPlanNode* queryPlanFromString() { - return NULL; -}