未验证 提交 d3132dcf 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #11023 from taosdata/feature/3.0_wxy

nested subqueries and sort plan impl
......@@ -89,6 +89,7 @@ tests/examples/JDBC/JDBCDemo/.project
tests/examples/JDBC/JDBCDemo/.settings/
source/libs/parser/inc/sql.*
tests/script/tmqResult.txt
tests/tmqResult.txt
# Emacs
# -*- mode: gitignore; -*-
......
......@@ -120,6 +120,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_PLAN_SORT,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,
......
......@@ -66,6 +66,7 @@ typedef struct SAggLogicNode {
typedef struct SProjectLogicNode {
SLogicNode node;
SNodeList* pProjections;
char stmtName[TSDB_TABLE_NAME_LEN];
} SProjectLogicNode;
typedef struct SVnodeModifLogicNode {
......@@ -97,8 +98,14 @@ typedef struct SWindowLogicNode {
int8_t slidingUnit;
SFillNode* pFill;
int64_t sessionGap;
SNode* pTspk;
} SWindowLogicNode;
typedef struct SSortLogicNode {
SLogicNode node;
SNodeList* pSortKeys;
} SSortLogicNode;
typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL,
......@@ -200,7 +207,7 @@ typedef struct SJoinPhysiNode {
typedef struct SAggPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function
SNodeList* pGroupKeys; // SColumnRefNode list
SNodeList* pGroupKeys;
SNodeList* pAggFuncs;
} SAggPhysiNode;
......@@ -225,6 +232,7 @@ typedef struct SWinodwPhysiNode {
typedef struct SIntervalPhysiNode {
SWinodwPhysiNode window;
SNode* pTspk; // timestamp primary key
int64_t interval;
int64_t offset;
int64_t sliding;
......@@ -238,6 +246,12 @@ typedef struct SSessionWinodwPhysiNode {
int64_t gap;
} SSessionWinodwPhysiNode;
typedef struct SSortPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
} SSortPhysiNode;
typedef struct SDataSinkNode {
ENodeType type;
SDataBlockDescNode* pInputDataBlockDesc;
......
......@@ -191,12 +191,13 @@ typedef struct SStateWindowNode {
typedef struct SSessionWindowNode {
ENodeType type; // QUERY_NODE_SESSION_WINDOW
SNode* pCol;
SNode* pCol; // timestamp primary key
SNode* pGap; // gap between two session window(in microseconds)
} SSessionWindowNode;
typedef struct SIntervalWindowNode {
ENodeType type; // QUERY_NODE_INTERVAL_WINDOW
SNode* pCol; // timestamp primary key
SNode* pInterval; // SValueNode
SNode* pOffset; // SValueNode
SNode* pSliding; // SValueNode
......@@ -231,6 +232,7 @@ typedef struct SSelectStmt {
SNodeList* pOrderByList; // SOrderByExprNode
SNode* pLimit;
SNode* pSlimit;
char stmtName[TSDB_TABLE_NAME_LEN];
} SSelectStmt;
typedef enum ESetOperatorType {
......
......@@ -415,6 +415,7 @@ typedef struct STableScanInfo {
int32_t* rowCellInfoOffset;
SExprInfo* pExpr;
SSDataBlock block;
SArray* pColMatchInfo;
int32_t numOfOutput;
int64_t elapsedTime;
int32_t prevGroupId; // previous table group id
......@@ -648,8 +649,8 @@ typedef struct SDistinctOperatorInfo {
} SDistinctOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
......
......@@ -66,6 +66,11 @@ typedef enum SResultTsInterpType {
RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;
typedef struct SColMatchInfo {
int32_t colId;
int32_t targetSlotId;
} SColMatchInfo;
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = taosRand();
......@@ -2944,12 +2949,21 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo,
*status = BLK_DATA_ALL_NEEDED;
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
if (pBlock->pDataBlock == NULL) {
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
if (pCols == NULL) {
return terrno;
} else {
return TSDB_CODE_SUCCESS;
}
int32_t numOfCols = pBlock->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pCols, i);
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
ASSERT(pColMatchInfo->colId == p->info.colId);
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
}
return TSDB_CODE_SUCCESS;
}
int32_t loadDataBlockOnDemand(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
......@@ -5374,7 +5388,8 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
return pResBlock;
}
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo,
SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0);
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
......@@ -5387,12 +5402,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
return NULL;
}
pInfo->block.pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData idata = {0};
taosArrayPush(pInfo->block.pDataBlock, &idata);
}
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->order = order;
pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false;
......@@ -8569,6 +8591,7 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractScanColumnId(SNodeList* pNodeList);
static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* extractColMatchInfo(SNodeList* pNodeList);
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
......@@ -8577,7 +8600,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
......@@ -8717,9 +8742,14 @@ SArray* extractScanColumnId(SNodeList* pNodeList) {
}
for(int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*) pNode->pExpr;
taosArrayPush(pList, &pColNode->colId);
for (int32_t j = 0; j < numOfCols; ++j) {
STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, j);
if (pNode->slotId == i) {
SColumnNode* pColNode = (SColumnNode*) pNode->pExpr;
taosArrayPush(pList, &pColNode->colId);
break;
}
}
}
return pList;
......@@ -8751,6 +8781,28 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return pList;
}
SArray* extractColMatchInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for(int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*) pNode->pExpr;
SColMatchInfo c = {0};
c.colId = pColNode->colId;
c.targetSlotId = pNode->slotId;
taosArrayPush(pList, &c);
}
return pList;
}
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = 0;
if (tableType == TSDB_SUPER_TABLE) {
......
......@@ -19,6 +19,11 @@
#include "taos.h"
#include "taoserror.h"
#define COPY_ALL_SCALAR_FIELDS \
do { \
memcpy((pDst), (pSrc), sizeof(*pSrc)); \
} while (0)
#define COPY_SCALAR_FIELD(fldname) \
do { \
(pDst)->fldname = (pSrc)->fldname; \
......@@ -195,6 +200,12 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode
return (SNode*)pDst;
}
static SNode* orderByExprNodeCopy(const SOrderByExprNode* pSrc, SOrderByExprNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
CLONE_NODE_FIELD(pExpr);
return (SNode*)pDst;
}
static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) {
COPY_SCALAR_FIELD(mode);
CLONE_NODE_FIELD(pValues);
......@@ -251,6 +262,7 @@ static SNode* logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) {
static SNode* logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pProjections);
COPY_CHAR_ARRAY_FIELD(stmtName);
return (SNode*)pDst;
}
......@@ -267,16 +279,24 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo
}
static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(winType);
// COPY_SCALAR_FIELD(winType);
CLONE_NODE_LIST_FIELD(pFuncs);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
COPY_SCALAR_FIELD(intervalUnit);
COPY_SCALAR_FIELD(slidingUnit);
// COPY_SCALAR_FIELD(interval);
// COPY_SCALAR_FIELD(offset);
// COPY_SCALAR_FIELD(sliding);
// COPY_SCALAR_FIELD(intervalUnit);
// COPY_SCALAR_FIELD(slidingUnit);
CLONE_NODE_FIELD(pFill);
COPY_SCALAR_FIELD(sessionGap);
// COPY_SCALAR_FIELD(sessionGap);
CLONE_NODE_FIELD(pTspk);
return (SNode*)pDst;
}
static SNode* logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pSortKeys);
return (SNode*)pDst;
}
......@@ -339,6 +359,7 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
case QUERY_NODE_GROUPING_SET:
return groupingSetNodeCopy((const SGroupingSetNode*)pNode, (SGroupingSetNode*)pDst);
case QUERY_NODE_ORDER_BY_EXPR:
return orderByExprNodeCopy((const SOrderByExprNode*)pNode, (SOrderByExprNode*)pDst);
case QUERY_NODE_LIMIT:
break;
case QUERY_NODE_FILL:
......@@ -361,6 +382,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_SORT:
return logicSortCopy((const SSortLogicNode*)pNode, (SSortLogicNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst);
default:
......
......@@ -934,6 +934,37 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkSortPhysiPlanExprs = "Exprs";
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSortPhysiPlanExprs, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSortPhysiPlanSortKeys, pNode->pSortKeys);
}
return code;
}
static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
SSortPhysiNode* pNode = (SSortPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSortPhysiPlanExprs, &pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSortPhysiPlanSortKeys, &pNode->pSortKeys);
}
return code;
}
static const char* jkWindowPhysiPlanExprs = "Exprs";
static const char* jkWindowPhysiPlanFuncs = "Funcs";
......@@ -971,6 +1002,7 @@ static const char* jkIntervalPhysiPlanSliding = "Sliding";
static const char* jkIntervalPhysiPlanIntervalUnit = "intervalUnit";
static const char* jkIntervalPhysiPlanSlidingUnit = "slidingUnit";
static const char* jkIntervalPhysiPlanFill = "Fill";
static const char* jkIntervalPhysiPlanTsPk = "TsPk";
static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
......@@ -994,6 +1026,9 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanTsPk, nodeToJson, pNode->pTspk);
}
return code;
}
......@@ -1020,6 +1055,9 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanTsPk, (SNode**)&pNode->pTspk);
}
return code;
}
......@@ -1821,6 +1859,38 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) {
return code;
}
static const char* jkOrderByExprExpr = "Expr";
static const char* jkOrderByExprOrder = "Order";
static const char* jkOrderByExprNullOrder = "NullOrder";
static int32_t orderByExprNodeToJson(const void* pObj, SJson* pJson) {
const SOrderByExprNode* pNode = (const SOrderByExprNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkOrderByExprExpr, nodeToJson, pNode->pExpr);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkOrderByExprOrder, pNode->order);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkOrderByExprNullOrder, pNode->nullOrder);
}
return code;
}
static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) {
SOrderByExprNode* pNode = (SOrderByExprNode*)pObj;
int32_t code = jsonToNodeObject(pJson, jkOrderByExprExpr, &pNode->pExpr);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder);
}
return code;
}
static const char* jkIntervalWindowInterval = "Interval";
static const char* jkIntervalWindowOffset = "Offset";
static const char* jkIntervalWindowSliding = "Sliding";
......@@ -2169,6 +2239,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_GROUPING_SET:
return groupingSetNodeToJson(pObj, pJson);
case QUERY_NODE_ORDER_BY_EXPR:
return orderByExprNodeToJson(pObj, pJson);
case QUERY_NODE_LIMIT:
case QUERY_NODE_STATE_WINDOW:
case QUERY_NODE_SESSION_WINDOW:
......@@ -2232,7 +2303,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return physiExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
break;
return physiSortNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return physiIntervalNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
......@@ -2272,7 +2343,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
// break;
// case QUERY_NODE_GROUPING_SET:
// return jsonToGroupingSetNode(pJson, pObj);
// case QUERY_NODE_ORDER_BY_EXPR:
case QUERY_NODE_ORDER_BY_EXPR:
return jsonToOrderByExprNode(pJson, pObj);
// case QUERY_NODE_LIMIT:
// case QUERY_NODE_STATE_WINDOW:
// case QUERY_NODE_SESSION_WINDOW:
......@@ -2321,6 +2393,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return jsonToPhysiExchangeNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return jsonToPhysiSortNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return jsonToPhysiIntervalNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
......
......@@ -99,6 +99,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
if (DEAL_RES_ERROR != res) {
res = walkNode(pInterval->pFill, order, walker, pContext);
}
if (DEAL_RES_ERROR != res) {
res = walkNode(pInterval->pCol, order, walker, pContext);
}
break;
}
case QUERY_NODE_NODE_LIST:
......@@ -225,6 +228,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pInterval->pCol), order, rewriter, pContext);
}
break;
}
case QUERY_NODE_NODE_LIST:
......@@ -294,10 +300,10 @@ void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker wa
case SQL_CLAUSE_GROUP_BY:
nodesWalkNode(pSelect->pHaving, walker, pContext);
case SQL_CLAUSE_HAVING:
nodesWalkList(pSelect->pProjectionList, walker, pContext);
case SQL_CLAUSE_SELECT:
nodesWalkList(pSelect->pOrderByList, walker, pContext);
case SQL_CLAUSE_ORDER_BY:
nodesWalkList(pSelect->pProjectionList, walker, pContext);
case SQL_CLAUSE_SELECT:
default:
break;
}
......
......@@ -159,6 +159,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SExchangeLogicNode));
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return makeNode(type, sizeof(SWindowLogicNode));
case QUERY_NODE_LOGIC_PLAN_SORT:
return makeNode(type, sizeof(SSortLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SLogicSubplan));
case QUERY_NODE_LOGIC_PLAN:
......@@ -182,7 +184,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SNode));
return makeNode(type, sizeof(SSortPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return makeNode(type, sizeof(SIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
......@@ -555,7 +557,7 @@ static EDealRes collectColumns(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SColumnNode* pCol = (SColumnNode*)pNode;
int32_t colId = pCol->colId;
if (0 == strcmp(pCxt->pTableAlias, pCol->tableAlias)) {
if (NULL == pCxt->pTableAlias || 0 == strcmp(pCxt->pTableAlias, pCol->tableAlias)) {
return doCollect(pCxt, colId, pNode);
}
}
......
......@@ -30,6 +30,8 @@ extern "C" {
#define parserDebug(param, ...) qDebug("PARSER: " param, __VA_ARGS__)
#define parserTrace(param, ...) qTrace("PARSER: " param, __VA_ARGS__)
#define PK_TS_COL_INTERNAL_NAME "_rowts"
typedef struct SMsgBuf {
int32_t len;
char *buf;
......
......@@ -645,6 +645,11 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
tempTable->pSubquery = pSubquery;
if (NULL != pTableAlias && TK_NK_NIL != pTableAlias->type) {
strncpy(tempTable->table.tableAlias, pTableAlias->z, pTableAlias->n);
} else {
sprintf(tempTable->table.tableAlias, "%p", tempTable);
}
if (QUERY_NODE_SELECT_STMT == nodeType(pSubquery)) {
strcpy(((SSelectStmt*)pSubquery)->stmtName, tempTable->table.tableAlias);
}
return (SNode*)tempTable;
}
......@@ -697,6 +702,13 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pCol) {
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill) {
SIntervalWindowNode* interval = (SIntervalWindowNode*)nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
CHECK_OUT_OF_MEM(interval);
interval->pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == interval->pCol) {
nodesDestroyNode(interval);
CHECK_OUT_OF_MEM(interval->pCol);
}
((SColumnNode*)interval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(((SColumnNode*)interval->pCol)->colName, PK_TS_COL_INTERNAL_NAME);
interval->pInterval = pInterval;
interval->pOffset = pOffset;
interval->pSliding = pSliding;
......@@ -792,6 +804,7 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
select->isDistinct = isDistinct;
select->pProjectionList = pProjectionList;
select->pFromTable = pTable;
sprintf(select->stmtName, "%p", select);
return (SNode*)select;
}
......
......@@ -271,6 +271,10 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
bool found = false;
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME)) {
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, false, pCol);
return true;
}
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
for (int32_t i = 0; i < nums; ++i) {
if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) {
......@@ -1865,6 +1869,7 @@ static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt)
if (NULL == pSelect) {
return TSDB_CODE_OUT_OF_MEMORY;
}
sprintf(pSelect->stmtName, "%p", pSelect);
SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE);
if (NULL == pTable) {
......@@ -1873,6 +1878,7 @@ static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt)
}
strcpy(pTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB);
strcpy(pTable->table.tableName, getSysTableName(showType));
strcpy(pTable->table.tableAlias, pTable->table.tableName);
pSelect->pFromTable = (SNode*)pTable;
*pStmt = pSelect;
......
......@@ -22,32 +22,6 @@ extern "C" {
#include "planner.h"
#define CHECK_ALLOC(p, res) \
do { \
if (NULL == (p)) { \
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
return (res); \
} \
} while (0)
#define CHECK_CODE(exec, res) \
do { \
int32_t code = (exec); \
if (TSDB_CODE_SUCCESS != code) { \
pCxt->errCode = code; \
return (res); \
} \
} while (0)
#define CHECK_CODE_EXT(exec) \
do { \
int32_t code = (exec); \
if (TSDB_CODE_SUCCESS != code) { \
pCxt->errCode = code; \
return code; \
} \
} while (0)
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
......
......@@ -45,7 +45,9 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
}
if (nodesEqualNode(pExpr, *pNode)) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
CHECK_ALLOC(pCol, DEAL_RES_ERROR);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode);
pCol->node.resType = pToBeRewrittenExpr->resType;
strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName);
......@@ -86,7 +88,7 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
}
static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
static int32_t rewriteId = 1;
static int32_t rewriteId = 1; // todo modify
SNameExprCxt nameCxt = { .rewriteId = rewriteId };
nodesWalkList(pExprs, doNameExpr, &nameCxt);
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
......@@ -291,13 +293,14 @@ static int32_t createLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSel
return code;
}
static SColumnNode* createColumnByExpr(SExprNode* pExpr) {
static SColumnNode* createColumnByExpr(const char* pStmtName, SExprNode* pExpr) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
pCol->node.resType = pExpr->resType;
strcpy(pCol->colName, pExpr->aliasName);
strcpy(pCol->tableAlias, pStmtName);
return pCol;
}
......@@ -311,20 +314,22 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN: {
SNode* pCol = nodesCloneNode(pNode);
CHECK_ALLOC(pCol, DEAL_RES_ERROR);
CHECK_CODE(nodesListAppend(pCxt->pList, pCol), DEAL_RES_ERROR);
return DEAL_RES_IGNORE_CHILD;
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
CHECK_ALLOC(pCol, DEAL_RES_ERROR);
if (NULL == pCol) {
return DEAL_RES_ERROR;
}
pCol->node.resType = pExpr->resType;
strcpy(pCol->colName, pExpr->aliasName);
CHECK_CODE(nodesListAppend(pCxt->pList, (SNode*)pCol), DEAL_RES_ERROR);
return DEAL_RES_IGNORE_CHILD;
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
default:
break;
......@@ -457,6 +462,12 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
pWindow->pTspk = nodesCloneNode(pInterval->pCol);
if (NULL == pWindow->pTspk) {
nodesDestroyNode(pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL != pInterval->pFill) {
pWindow->pFill = nodesCloneNode(pInterval->pFill);
if (NULL == pWindow->pFill) {
......@@ -485,7 +496,42 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
return TSDB_CODE_FAILED;
}
static int32_t createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pExprs, SNodeList** pCols) {
static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (NULL == pSelect->pOrderByList) {
return TSDB_CODE_SUCCESS;
}
SSortLogicNode* pSort = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT);
if (NULL == pSort) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNodeList* pCols = NULL;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, &pCols);
if (TSDB_CODE_SUCCESS == code && NULL != pCols) {
pSort->node.pTargets = nodesCloneList(pCols);
if (NULL == pSort->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
pSort->pSortKeys = nodesCloneList(pSelect->pOrderByList);
if (NULL == pSort->pSortKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pSort;
} else {
nodesDestroyNode(pSort);
}
return code;
}
static int32_t createColumnByProjections(SLogicPlanContext* pCxt, const char* pStmtName, SNodeList* pExprs, SNodeList** pCols) {
SNodeList* pList = nodesMakeList();
if (NULL == pList) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -493,7 +539,7 @@ static int32_t createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pEx
SNode* pNode;
FOREACH(pNode, pExprs) {
if (TSDB_CODE_SUCCESS != nodesListAppend(pList, createColumnByExpr((SExprNode*)pNode))) {
if (TSDB_CODE_SUCCESS != nodesListAppend(pList, createColumnByExpr(pStmtName, (SExprNode*)pNode))) {
nodesDestroyList(pList);
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -515,9 +561,10 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
if (NULL == pProject->pProjections) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pProject->stmtName, pSelect->stmtName);
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByProjections(pCxt,pSelect->pProjectionList, &pProject->node.pTargets);
code = createColumnByProjections(pCxt, pSelect->stmtName, pSelect->pProjectionList, &pProject->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -538,6 +585,9 @@ static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
if (TSDB_CODE_SUCCESS == code) {
code = createChildLogicNode(pCxt, pSelect, createAggLogicNode, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
code = createChildLogicNode(pCxt, pSelect, createSortLogicNode, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
code = createChildLogicNode(pCxt, pSelect, createProjectLogicNode, &pRoot);
}
......
......@@ -17,9 +17,14 @@
#include "functionMgt.h"
typedef struct SSlotIdInfo {
int16_t slotId;
bool set;
} SSlotIdInfo;
typedef struct SSlotIndex {
int16_t dataBlockId;
int16_t slotId;
SArray* pSlotIdsInfo; // duplicate name slot
} SSlotIndex;
typedef struct SPhysiPlanContext {
......@@ -30,72 +35,195 @@ typedef struct SPhysiPlanContext {
SArray* pExecNodeList;
} SPhysiPlanContext;
static int32_t getSlotKey(SNode* pNode, char* pKey) {
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
if (QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode)) {
return getSlotKey(((SOrderByExprNode*)pNode)->pExpr, pStmtName, pKey);
}
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SColumnNode* pCol = (SColumnNode*)pNode;
if (NULL != pStmtName) {
return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
}
if ('\0' == pCol->tableAlias[0]) {
return sprintf(pKey, "%s", pCol->colName);
}
return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
}
if (NULL != pStmtName) {
return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName);
}
return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId) {
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId, bool output) {
SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
CHECK_ALLOC(pSlot, NULL);
if (NULL == pSlot) {
return NULL;
}
pSlot->slotId = slotId;
pSlot->dataType = ((SExprNode*)pNode)->resType;
pSlot->reserve = false;
pSlot->output = true;
pSlot->output = output;
return (SNode*)pSlot;
}
static SNode* createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId) {
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
if (NULL == pTarget) {
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
pTarget->dataBlockId = dataBlockId;
pTarget->slotId = slotId;
pTarget->pExpr = pNode;
return (SNode*)pTarget;
*pOutput = (SNode*)pTarget;
return TSDB_CODE_SUCCESS;
}
static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
SHashObj* pHash = NULL;
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
if (NULL != pIndex) {
SSlotIdInfo info = { .slotId = slotId, .set = false };
taosArrayPush(pIndex->pSlotIdsInfo, &info);
return TSDB_CODE_SUCCESS;
}
SSlotIndex index = { .dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo)) };
if (NULL == index.pSlotIdsInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSlotIdInfo info = { .slotId = slotId, .set = false };
taosArrayPush(index.pSlotIdsInfo, &info);
return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}
static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
int32_t len = getSlotKey(pNode, NULL, name);
return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash);
}
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId, SHashObj** pDescHash) {
SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
taosHashCleanup(pHash);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pDescHash = pHash;
return TSDB_CODE_SUCCESS;
}
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, SHashObj* pHash) {
pDataBlockDesc->pSlots = nodesMakeList();
if (NULL == pDataBlockDesc->pSlots) {
pDataBlockDesc->pSlots = nodesMakeList();
CHECK_ALLOC(pDataBlockDesc->pSlots, TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_OUT_OF_MEMORY;
}
pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY);
if (NULL == taosArrayInsert(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId, &pHash)) {
taosHashCleanup(pHash);
return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = TSDB_CODE_SUCCESS;
int16_t slotId = 0;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId, true));
if (TSDB_CODE_SUCCESS == code) {
code = putSlotToHash(pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
}
if (TSDB_CODE_SUCCESS == code) {
pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes;
++slotId;
} else {
break;
}
}
return code;
}
static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
SDataBlockDescNode* pDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
if (NULL == pDesc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pDesc->dataBlockId = pCxt->nextDataBlockId++;
SHashObj* pHash = NULL;
int32_t code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash);
if (TSDB_CODE_SUCCESS == code) {
code = buildDataBlockSlots(pCxt, pList, pDesc, pHash);
}
if (TSDB_CODE_SUCCESS == code) {
*pDataBlockDesc = pDesc;
} else {
pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
nodesDestroyNode(pDesc);
}
return code;
}
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
int32_t size = taosArrayGetSize(pSlotIdsInfo);
for (int32_t i = 0; i < size; ++i) {
SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
if (!pInfo->set) {
pInfo->set = true;
return pInfo->slotId;
}
}
return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
}
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, const char* pStmtName, bool output) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
int16_t nextSlotId = taosHashGetSize(pHash), slotId = 0;
SNode* pNode = NULL;
int16_t slotId = taosHashGetSize(pHash);
FOREACH(pNode, pList) {
CHECK_CODE_EXT(nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId)));
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
int32_t len = getSlotKey(pNode, pStmtName, name);
SSlotIndex* pIndex = taosHashGet(pHash, name, len);
if (NULL == pIndex) {
code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, nextSlotId, output));
if (TSDB_CODE_SUCCESS == code) {
code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
}
pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes;
slotId = nextSlotId;
++nextSlotId;
} else {
slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
}
SSlotIndex index = { .dataBlockId = pDataBlockDesc->dataBlockId, .slotId = slotId };
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
int32_t len = getSlotKey(pNode, name);
CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY);
if (TSDB_CODE_SUCCESS == code) {
SNode* pTarget = NULL;
code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(pTarget);
}
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
SNode* pTarget = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId);
CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY);
REPLACE_NODE(pTarget);
static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false);
}
pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes;
++slotId;
}
return TSDB_CODE_SUCCESS;
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true);
}
static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true);
}
typedef struct SSetSlotIdCxt {
......@@ -108,16 +236,17 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
int32_t len = getSlotKey(pNode, name);
int32_t len = getSlotKey(pNode, NULL, name);
SSlotIndex* pIndex = taosHashGet(pCxt->pLeftHash, name, len);
if (NULL == pIndex) {
pIndex = taosHashGet(pCxt->pRightHash, name, len);
}
// pIndex is definitely not NULL, otherwise it is a bug
CHECK_ALLOC(pIndex, DEAL_RES_ERROR);
if (NULL == pIndex) {
return DEAL_RES_ERROR;
}
((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
((SColumnNode*)pNode)->slotId = pIndex->slotId;
CHECK_ALLOC(pNode, DEAL_RES_ERROR);
((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
......@@ -144,7 +273,7 @@ static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i
return TSDB_CODE_SUCCESS;
}
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNodeList* pList, SNodeList** pOutput) {
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, const SNodeList* pList, SNodeList** pOutput) {
SNodeList* pRes = nodesCloneList(pList);
if (NULL == pRes) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -164,18 +293,17 @@ static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i
return TSDB_CODE_SUCCESS;
}
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, ENodeType type) {
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
if (NULL == pPhysiNode) {
return NULL;
}
pPhysiNode->pOutputDataBlockDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
if (NULL == pPhysiNode->pOutputDataBlockDesc) {
int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pPhysiNode);
return NULL;
}
pPhysiNode->pOutputDataBlockDesc->dataBlockId = pCxt->nextDataBlockId++;
pPhysiNode->pOutputDataBlockDesc->type = QUERY_NODE_DATABLOCK_DESC;
return pPhysiNode;
}
......@@ -186,24 +314,11 @@ static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pL
return TSDB_CODE_SUCCESS;
}
static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, SDataBlockDescNode* pDataBlockDesc) {
SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
SNode* pNode;
FOREACH(pNode, pTargets) {
int32_t len = getSlotKey(pNode, name);
SSlotIndex* pIndex = taosHashGet(pHash, name, len);
// pIndex is definitely not NULL, otherwise it is a bug
CHECK_ALLOC(pIndex, TSDB_CODE_FAILED);
((SSlotDescNode*)nodesListGetNode(pDataBlockDesc->pSlots, pIndex->slotId))->output = true;
}
return TSDB_CODE_SUCCESS;
}
static SNodeptr createPrimaryKeyCol(SPhysiPlanContext* pCxt, uint64_t tableId) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
CHECK_ALLOC(pCol, NULL);
if (NULL == pCol) {
return NULL;
}
pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
pCol->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
pCol->tableId = tableId;
......@@ -244,8 +359,12 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanPhysiNode)
|| QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN == nodeType(pScanPhysiNode)) {
pScanPhysiNode->pScanCols = nodesMakeList();
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid)));
if (NULL == pScanPhysiNode->pScanCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pNode;
FOREACH(pNode, pScanCols) {
......@@ -255,29 +374,29 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys
strcpy(pCol->colName, ((SColumnNode*)pNode)->colName);
continue;
}
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode)));
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
} else {
pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
if (NULL == pScanPhysiNode->pScanCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
// return sortScanCols(pScanPhysiNode->pScanCols);
return TSDB_CODE_SUCCESS;
return sortScanCols(pScanPhysiNode->pScanCols);
}
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
if (TSDB_CODE_SUCCESS == code) {
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
code = addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = setSlotOutput(pCxt, pScanLogicNode->node.pTargets, pScanPhysiNode->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
......@@ -302,7 +421,7 @@ static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAdd
}
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
if (NULL == pTagScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -310,7 +429,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* p
}
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
if (NULL == pTableScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -326,7 +445,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
}
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -347,7 +466,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
}
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -411,7 +530,7 @@ static int32_t createJoinOutputCols(SPhysiPlanContext* pCxt, SDataBlockDescNode*
}
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) {
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN);
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_JOIN);
if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -424,14 +543,11 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
code = createJoinOutputCols(pCxt, pLeftDesc, pRightDesc, &pJoin->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockDesc(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
}
if (TSDB_CODE_SUCCESS == code) {
code = setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, pJoin->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pJoin;
......@@ -451,7 +567,9 @@ typedef struct SRewritePrecalcExprsCxt {
static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
SNode* pExpr = nodesCloneNode(*pNode);
CHECK_ALLOC(pExpr, DEAL_RES_ERROR);
if (NULL == pExpr) {
return DEAL_RES_ERROR;
}
if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
nodesDestroyNode(pExpr);
return DEAL_RES_ERROR;
......@@ -499,11 +617,15 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN
if (NULL == *pPrecalcExprs) {
*pPrecalcExprs = nodesMakeList();
CHECK_ALLOC(*pPrecalcExprs, TSDB_CODE_OUT_OF_MEMORY);
if (NULL == *pPrecalcExprs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (NULL == *pRewrittenList) {
*pRewrittenList = nodesMakeList();
CHECK_ALLOC(*pRewrittenList, TSDB_CODE_OUT_OF_MEMORY);
if (NULL == *pRewrittenList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SNode* pNode = NULL;
FOREACH(pNode, pList) {
......@@ -513,8 +635,12 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN
} else {
pNew = nodesCloneNode(pNode);
}
CHECK_ALLOC(pNew, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(nodesListAppend(*pRewrittenList, pNew), TSDB_CODE_OUT_OF_MEMORY);
if (NULL == pNew) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SRewritePrecalcExprsCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs };
nodesRewriteList(*pRewrittenList, doRewritePrecalcExprs, &cxt);
......@@ -526,7 +652,7 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN
}
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode, SPhysiNode** pPhyNode) {
SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_AGG);
SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG);
if (NULL == pAgg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -544,30 +670,27 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockDesc(pCxt, pAgg->pExprs, pChildTupe);
code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockDesc(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockDesc(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
}
if (TSDB_CODE_SUCCESS == code) {
code = setSlotOutput(pCxt, pAggLogicNode->node.pTargets, pAgg->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pAgg;
......@@ -575,18 +698,22 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
nodesDestroyNode(pAgg);
}
nodesDestroyList(pPrecalcExprs);
nodesDestroyList(pGroupKeys);
nodesDestroyList(pAggFuncs);
return code;
}
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
if (NULL == pProject) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1, pProjectLogicNode->pProjections, &pProject->pProjections);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockDesc(pCxt, pProject->pProjections, pProject->node.pOutputDataBlockDesc);
code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections, pProject->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
......@@ -602,34 +729,30 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
}
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
int32_t code = addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc);
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pExchange;
} else {
nodesDestroyNode(pExchange);
}
*pPhyNode = (SPhysiNode*)pExchange;
return code;
return TSDB_CODE_SUCCESS;
}
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pScan->node.pOutputDataBlockDesc);
int32_t code = TSDB_CODE_SUCCESS;
pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
if (NULL == pScan->pScanCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
if (NULL == pScan->pScanCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -659,21 +782,17 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockDesc(pCxt, pWindow->pExprs, pChildTupe);
code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe);
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockDesc(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setSlotOutput(pCxt, pWindowLogicNode->node.pTargets, pWindow->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pWindow;
} else {
......@@ -684,7 +803,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
}
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_INTERVAL);
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERVAL);
if (NULL == pInterval) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -701,11 +820,18 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
return TSDB_CODE_OUT_OF_MEMORY;
}
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pInterval->pTspk);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pInterval);
return code;
}
return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
}
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW);
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW);
if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -729,6 +855,41 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr
return TSDB_CODE_FAILED;
}
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode, SPhysiNode** pPhyNode) {
SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pSortLogicNode, QUERY_NODE_PHYSICAL_PLAN_SORT);
if (NULL == pSort) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNodeList* pPrecalcExprs = NULL;
SNodeList* pSortKeys = NULL;
int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pSort->pSortKeys, pSort->node.pOutputDataBlockDesc);
}
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pSort;
} else {
nodesDestroyNode(pSort);
}
return code;
}
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan, SNodeList* pChildren, SPhysiNode** pPhyNode) {
switch (nodeType(pLogicNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
......@@ -743,6 +904,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_SORT:
return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
default:
break;
}
......@@ -874,17 +1037,22 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l
SNodeListNode* pGroup;
if (level >= LIST_LENGTH(pSubplans)) {
pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST);
CHECK_ALLOC(pGroup, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(nodesListStrictAppend(pSubplans, pGroup), TSDB_CODE_OUT_OF_MEMORY);
if (NULL == pGroup) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, pGroup)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
pGroup = nodesListGetNode(pSubplans, level);
}
if (NULL == pGroup->pNodeList) {
pGroup->pNodeList = nodesMakeList();
CHECK_ALLOC(pGroup->pNodeList, TSDB_CODE_OUT_OF_MEMORY);
if (NULL == pGroup->pNodeList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
CHECK_CODE(nodesListStrictAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_SUCCESS;
return nodesListStrictAppend(pGroup->pNodeList, pSubplan);
}
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) {
......
......@@ -65,7 +65,9 @@ static int32_t stsMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
SStsInfo* pInfo = taosMemoryCalloc(1, sizeof(SStsInfo));
CHECK_ALLOC(pInfo, TSDB_CODE_OUT_OF_MEMORY);
if (NULL == pInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInfo->pScan = (SScanLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
pCxt->pInfo = pInfo;
......
......@@ -170,7 +170,7 @@ TEST_F(PlannerTest, groupBy) {
bind("SELECT count(*) FROM t1");
ASSERT_TRUE(run());
bind("SELECT c1, count(*) FROM t1 GROUP BY c1");
bind("SELECT c1, max(c3), min(c2), count(*) FROM t1 GROUP BY c1");
ASSERT_TRUE(run());
bind("SELECT c1 + c3, c1 + count(*) FROM t1 where c2 = 'abc' GROUP BY c1, c3");
......@@ -201,10 +201,31 @@ TEST_F(PlannerTest, sessionWindow) {
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, orderBy) {
setDatabase("root", "test");
bind("SELECT * FROM t1 order by c1");
ASSERT_TRUE(run());
bind("SELECT c1 FROM t1 order by c2");
ASSERT_TRUE(run());
bind("SELECT * FROM t1 order by c1 + 10, c2");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, showTables) {
setDatabase("root", "test");
bind("show tables");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, showStables) {
setDatabase("root", "test");
bind("show stables");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, createTopic) {
......
......@@ -947,10 +947,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->taskType, taskType);
atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册