未验证 提交 87b81314 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #14266 from taosdata/feature/3.0_debug_wxy

feat: tail function rewrite to statement
......@@ -236,7 +236,7 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t*
int8_t needCompress);
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowData(const SArray* dataBlocks, const char* flag);
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
......
......@@ -24,6 +24,8 @@ extern "C" {
#include "querynodes.h"
#include "tname.h"
#define SLOT_NAME_LEN TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN
typedef struct SLogicNode {
ENodeType type;
SNodeList* pTargets; // SColumnNode
......@@ -74,8 +76,8 @@ typedef struct SScanLogicNode {
int16_t tsColId;
double filesFactor;
SArray* pSmaIndexes;
SNodeList* pPartTags;
bool partSort;
SNodeList* pGroupTags;
bool groupSort;
} SScanLogicNode;
typedef struct SJoinLogicNode {
......@@ -100,6 +102,7 @@ typedef struct SProjectLogicNode {
typedef struct SIndefRowsFuncLogicNode {
SLogicNode node;
SNodeList* pFuncs;
bool isTailFunc;
} SIndefRowsFuncLogicNode;
typedef struct SInterpFuncLogicNode {
......@@ -138,6 +141,7 @@ typedef struct SMergeLogicNode {
SNodeList* pInputs;
int32_t numOfChannels;
int32_t srcGroupId;
bool groupSort;
} SMergeLogicNode;
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
......@@ -184,6 +188,7 @@ typedef struct SFillLogicNode {
typedef struct SSortLogicNode {
SLogicNode node;
SNodeList* pSortKeys;
bool groupSort;
} SSortLogicNode;
typedef struct SPartitionLogicNode {
......@@ -230,6 +235,7 @@ typedef struct SSlotDescNode {
bool reserve;
bool output;
bool tag;
char name[SLOT_NAME_LEN];
} SSlotDescNode;
typedef struct SDataBlockDescNode {
......@@ -279,7 +285,8 @@ typedef struct STableScanPhysiNode {
double ratio;
int32_t dataRequired;
SNodeList* pDynamicScanFuncs;
SNodeList* pPartitionTags;
SNodeList* pGroupTags;
bool groupSort;
int64_t interval;
int64_t offset;
int64_t sliding;
......@@ -353,6 +360,7 @@ typedef struct SMergePhysiNode {
SNodeList* pTargets;
int32_t numOfChannels;
int32_t srcGroupId;
bool groupSort;
} SMergePhysiNode;
typedef struct SWinodwPhysiNode {
......
......@@ -259,6 +259,7 @@ typedef struct SSelectStmt {
bool hasTailFunc;
bool hasInterpFunc;
bool hasLastRowFunc;
bool groupSort;
} SSelectStmt;
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;
......
......@@ -1605,7 +1605,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf;
}
void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
char pBuf[128] = {0};
int32_t sz = taosArrayGetSize(dataBlocks);
for (int32_t i = 0; i < sz; i++) {
......@@ -1613,7 +1613,7 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId);
for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag);
for (int32_t k = 0; k < numOfCols; k++) {
......
......@@ -487,7 +487,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
#if 1
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowData(pResult, flag);
blockDebugShowDataBlocks(pResult, flag);
#endif
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL;
......
......@@ -299,7 +299,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
blockDebugShowData(data, __func__);
blockDebugShowDataBlocks(data, __func__);
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
}
......
......@@ -136,6 +136,13 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
*/
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
/**
*
* @param pVHandle
* @return
*/
uint64_t tsortGetGroupId(STupleHandle* pVHandle);
/**
*
* @param pSortHandle
......
......@@ -2051,7 +2051,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
return TSDB_CODE_SUCCESS;
}
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2455,7 +2455,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
if (pTableScanNode->pPartitionTags) {
if (pTableScanNode->pGroupTags) {
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
}
......
......@@ -142,7 +142,8 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
return pOperator->fpSet.getNextFn(pOperator);
SSDataBlock *pBlock = pOperator->fpSet.getNextFn(pOperator);
return pBlock;
}
// todo refactor: merged with fetch fp
......@@ -505,7 +506,9 @@ typedef struct SMultiwaySortMergeOperatorInfo {
SSDataBlock* pInputBlock;
int64_t startTs; // sort start time
uint64_t groupId;
bool hasGroupId;
uint64_t groupId;
STupleHandle *prefetchedTuple;
} SMultiwaySortMergeOperatorInfo;
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
......@@ -560,12 +563,30 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
blockDataEnsureCapacity(p, capacity);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
STupleHandle* pTupleHandle = NULL;
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(p, pTupleHandle);
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) {
break;
}
......@@ -608,7 +629,6 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes,
pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
} else {
......
......@@ -752,6 +752,10 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
}
}
uint64_t tsortGetGroupId(STupleHandle* pVHandle) {
return pVHandle->pBlock->info.groupId;
}
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
SSortExecInfo info = {0};
......
......@@ -351,7 +351,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor);
CLONE_NODE_LIST_FIELD(pPartTags);
CLONE_NODE_LIST_FIELD(pGroupTags);
return TSDB_CODE_SUCCESS;
}
......@@ -401,6 +401,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
CLONE_NODE_LIST_FIELD(pInputs);
COPY_SCALAR_FIELD(numOfChannels);
COPY_SCALAR_FIELD(srcGroupId);
COPY_SCALAR_FIELD(groupSort);
return TSDB_CODE_SUCCESS;
}
......@@ -436,6 +437,7 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pSortKeys);
COPY_SCALAR_FIELD(groupSort);
return TSDB_CODE_SUCCESS;
}
......@@ -500,7 +502,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
COPY_SCALAR_FIELD(ratio);
COPY_SCALAR_FIELD(dataRequired);
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
CLONE_NODE_LIST_FIELD(pPartitionTags);
CLONE_NODE_LIST_FIELD(pGroupTags);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
......
......@@ -234,6 +234,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiMerge";
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT:
return "PhysiGroupSort";
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
return "PhysiHashInterval";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
......@@ -539,7 +541,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
static const char* jkScanLogicPlanTableId = "TableId";
static const char* jkScanLogicPlanTableType = "TableType";
static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanPartTags = "PartTags";
static const char* jkScanLogicPlanGroupTags = "GroupTags";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
......@@ -561,7 +563,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanLogicPlanPartTags, pNode->pPartTags);
code = nodeListToJson(pJson, jkScanLogicPlanGroupTags, pNode->pGroupTags);
}
return code;
......@@ -588,7 +590,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanLogicPlanPartTags, &pNode->pPartTags);
code = jsonToNodeList(pJson, jkScanLogicPlanGroupTags, &pNode->pGroupTags);
}
return code;
......@@ -1430,7 +1432,8 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
static const char* jkTableScanPhysiPlanWatermark = "watermark";
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
static const char* jkTableScanPhysiPlanPartitionTags = "PartitionTags";
static const char* jkTableScanPhysiPlanGroupTags = "GroupTags";
static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
......@@ -1485,7 +1488,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkTableScanPhysiPlanPartitionTags, pNode->pPartitionTags);
code = nodeListToJson(pJson, jkTableScanPhysiPlanGroupTags, pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanGroupSort, pNode->groupSort);
}
return code;
......@@ -1544,7 +1550,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanPartitionTags, &pNode->pPartitionTags);
code = jsonToNodeList(pJson, jkTableScanPhysiPlanGroupTags, &pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanGroupSort, &pNode->groupSort);
}
return code;
......@@ -1725,6 +1734,7 @@ static const char* jkMergePhysiPlanMergeKeys = "MergeKeys";
static const char* jkMergePhysiPlanTargets = "Targets";
static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
static const char* jkMergePhysiPlanGroupSort = "GroupSort";
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
......@@ -1742,6 +1752,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanSrcGroupId, pNode->srcGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanGroupSort, pNode->groupSort);
}
return code;
}
......@@ -1762,6 +1775,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergePhysiPlanSrcGroupId, &pNode->srcGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkMergePhysiPlanGroupSort, &pNode->groupSort);
}
return code;
}
......@@ -3369,6 +3385,7 @@ static const char* jkSlotDescSlotId = "SlotId";
static const char* jkSlotDescDataType = "DataType";
static const char* jkSlotDescReserve = "Reserve";
static const char* jkSlotDescOutput = "Output";
static const char* jkSlotDescName = "Name";
static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) {
const SSlotDescNode* pNode = (const SSlotDescNode*)pObj;
......@@ -3383,6 +3400,9 @@ static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSlotDescOutput, pNode->output);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkSlotDescName, pNode->name);
}
return code;
}
......@@ -3400,6 +3420,9 @@ static int32_t jsonToSlotDescNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSlotDescOutput, &pNode->output);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkSlotDescName, pNode->name);
}
return code;
}
......@@ -4137,6 +4160,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return physiMergeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT:
return physiSortNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
......@@ -4280,6 +4304,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return jsonToPhysiMergeNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT:
return jsonToPhysiSortNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
......
......@@ -500,7 +500,8 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
case QUERY_NODE_PHYSICAL_PLAN_SORT:
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SSortPhysiNode* pSort = (SSortPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
......
......@@ -288,6 +288,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SMergePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SSortPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT:
return makeNode(type, sizeof(SGroupSortPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
return makeNode(type, sizeof(SIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
......@@ -709,7 +711,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pLogicNode->pTagCond);
nodesDestroyNode(pLogicNode->pTagIndexCond);
taosArrayDestroy(pLogicNode->pSmaIndexes);
nodesDestroyList(pLogicNode->pPartTags);
nodesDestroyList(pLogicNode->pGroupTags);
break;
}
case QUERY_NODE_LOGIC_PLAN_JOIN: {
......@@ -813,7 +815,7 @@ void nodesDestroyNode(SNode* pNode) {
STableScanPhysiNode* pPhyNode = (STableScanPhysiNode*)pNode;
destroyScanPhysiNode((SScanPhysiNode*)pNode);
nodesDestroyList(pPhyNode->pDynamicScanFuncs);
nodesDestroyList(pPhyNode->pPartitionTags);
nodesDestroyList(pPhyNode->pGroupTags);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
......@@ -850,7 +852,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pPhyNode->pTargets);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
case QUERY_NODE_PHYSICAL_PLAN_SORT:
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SSortPhysiNode* pPhyNode = (SSortPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pExprs);
......
......@@ -2456,90 +2456,6 @@ static int32_t rewriteUniqueStmt(STranslateContext* pCxt, SSelectStmt* pSelect)
return cxt.pTranslateCxt->errCode;
}
typedef struct SRwriteTailCxt {
STranslateContext* pTranslateCxt;
int64_t limit;
int64_t offset;
} SRwriteTailCxt;
static EDealRes rewriteTailFunc(SNode** pNode, void* pContext) {
SRwriteTailCxt* pCxt = pContext;
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (FUNCTION_TYPE_TAIL == pFunc->funcType) {
pCxt->limit = ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i;
if (3 == LIST_LENGTH(pFunc->pParameterList)) {
pCxt->offset = ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 2))->datum.i;
}
SNode* pExpr = nodesListGetNode(pFunc->pParameterList, 0);
strcpy(((SExprNode*)pExpr)->aliasName, ((SExprNode*)*pNode)->aliasName);
NODES_CLEAR_LIST(pFunc->pParameterList);
nodesDestroyNode(*pNode);
*pNode = pExpr;
return DEAL_RES_IGNORE_CHILD;
}
}
return DEAL_RES_CONTINUE;
}
static int32_t createLimieNode(SRwriteTailCxt* pCxt, SLimitNode** pOutput) {
*pOutput = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT);
if (NULL == *pOutput) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pOutput)->limit = pCxt->limit;
(*pOutput)->offset = pCxt->offset;
return TSDB_CODE_SUCCESS;
}
static SNode* createOrderByExpr(STranslateContext* pCxt) {
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pOrder) {
return NULL;
}
pCxt->errCode = createPrimaryKeyCol(pCxt, &pOrder->pExpr);
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
nodesDestroyNode((SNode*)pOrder);
return NULL;
}
pOrder->order = ORDER_DESC;
pOrder->nullOrder = NULL_ORDER_FIRST;
return (SNode*)pOrder;
}
/* case 1:
* in: select tail(expr, k, f) from t where_clause
* out: select expr from t where_clause order by _rowts desc limit k offset f
*
* case 2:
* in: select tail(expr, k, f) from t where_clause partition_by_clause
* out: select expr from t where_clause partition_by_clause sort by _rowts desc limit k offset f
*
* case 3:
* in: select tail(expr, k, f) from t where_clause order_by_clause limit_clause
* out: select expr from (
* select expr from t where_clause order by _rowts desc limit k offset f
* ) order_by_clause limit_clause
*
* case 4:
* in: select tail(expr, k, f) from t where_clause partition_by_clause limit_clause
* out:
*/
static int32_t rewriteTailStmt(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pSelect->hasTailFunc) {
return TSDB_CODE_SUCCESS;
}
SRwriteTailCxt cxt = {.pTranslateCxt = pCxt, .limit = -1, .offset = -1};
nodesRewriteExprs(pSelect->pProjectionList, rewriteTailFunc, &cxt);
int32_t code = nodesListMakeStrictAppend(&pSelect->pOrderByList, createOrderByExpr(pCxt));
if (TSDB_CODE_SUCCESS == code) {
code = createLimieNode(&cxt, &pSelect->pLimit);
}
pSelect->hasIndefiniteRowsFunc = false;
return code;
}
typedef struct SReplaceOrderByAliasCxt {
STranslateContext* pTranslateCxt;
SNodeList* pProjectionList;
......@@ -2616,9 +2532,6 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) {
code = rewriteUniqueStmt(pCxt, pSelect);
}
// if (TSDB_CODE_SUCCESS == code) {
// code = rewriteTailStmt(pCxt, pSelect);
// }
if (TSDB_CODE_SUCCESS == code) {
code = rewriteTimelineFunc(pCxt, pSelect);
}
......
......@@ -507,6 +507,8 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt
return TSDB_CODE_OUT_OF_MEMORY;
}
pIdfRowsFunc->isTailFunc = pSelect->hasTailFunc;
// indefinite rows functions and _select_values functions
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
......@@ -733,6 +735,8 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->groupSort = pSelect->groupSort;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pSort->node.pTargets) {
code = nodesListMakeStrictAppend(&pSort->node.pTargets,
......
......@@ -20,9 +20,8 @@
#define OPTIMIZE_FLAG_MASK(n) (1 << n)
#define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0)
#define OPTIMIZE_FLAG_CPD OPTIMIZE_FLAG_MASK(1)
#define OPTIMIZE_FLAG_OPK OPTIMIZE_FLAG_MASK(2)
#define OPTIMIZE_FLAG_SCAN_PATH OPTIMIZE_FLAG_MASK(0)
#define OPTIMIZE_FLAG_PUSH_DOWN_CONDE OPTIMIZE_FLAG_MASK(1)
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......@@ -76,7 +75,7 @@ static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func)
return NULL;
}
EDealRes osdHaveNormalColImpl(SNode* pNode, void* pContext) {
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
// *((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
*((bool*)pContext) = true;
......@@ -85,14 +84,14 @@ EDealRes osdHaveNormalColImpl(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static bool osdHaveNormalCol(SNodeList* pList) {
static bool scanPathOptHaveNormalCol(SNodeList* pList) {
bool res = false;
nodesWalkExprsPostOrder(pList, osdHaveNormalColImpl, &res);
nodesWalkExprsPostOrder(pList, scanPathOptHaveNormalColImpl, &res);
return res;
}
static bool osdMayBeOptimized(SLogicNode* pNode) {
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) {
static bool scanPathOptMayBeOptimized(SLogicNode* pNode) {
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH)) {
return false;
}
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) {
......@@ -108,10 +107,10 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent))) {
return true;
}
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
return !scanPathOptHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
}
static SNodeList* osdGetAllFuncs(SLogicNode* pNode) {
static SNodeList* scanPathOptGetAllFuncs(SLogicNode* pNode) {
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return ((SWindowLogicNode*)pNode)->pFuncs;
......@@ -123,7 +122,7 @@ static SNodeList* osdGetAllFuncs(SLogicNode* pNode) {
return NULL;
}
static bool needOptimizeDataRequire(const SFunctionNode* pFunc) {
static bool scanPathOptNeedOptimizeDataRequire(const SFunctionNode* pFunc) {
if (!fmIsSpecialDataRequiredFunc(pFunc->funcId)) {
return false;
}
......@@ -136,7 +135,7 @@ static bool needOptimizeDataRequire(const SFunctionNode* pFunc) {
return true;
}
static bool needOptimizeDynamicScan(const SFunctionNode* pFunc) {
static bool scanPathOptNeedDynOptimize(const SFunctionNode* pFunc) {
if (!fmIsDynamicScanOptimizedFunc(pFunc->funcId)) {
return false;
}
......@@ -149,17 +148,17 @@ static bool needOptimizeDynamicScan(const SFunctionNode* pFunc) {
return true;
}
static int32_t osdGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs, SNodeList** pDsoFuncs) {
SNodeList* pAllFuncs = osdGetAllFuncs(pScan->node.pParent);
static int32_t scanPathOptGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs, SNodeList** pDsoFuncs) {
SNodeList* pAllFuncs = scanPathOptGetAllFuncs(pScan->node.pParent);
SNodeList* pTmpSdrFuncs = NULL;
SNodeList* pTmpDsoFuncs = NULL;
SNode* pFunc = NULL;
bool otherFunc = false;
FOREACH(pFunc, pAllFuncs) {
int32_t code = TSDB_CODE_SUCCESS;
if (needOptimizeDataRequire((SFunctionNode*)pFunc)) {
if (scanPathOptNeedOptimizeDataRequire((SFunctionNode*)pFunc)) {
code = nodesListMakeStrictAppend(&pTmpSdrFuncs, nodesCloneNode(pFunc));
} else if (needOptimizeDynamicScan((SFunctionNode*)pFunc)) {
} else if (scanPathOptNeedDynOptimize((SFunctionNode*)pFunc)) {
code = nodesListMakeStrictAppend(&pTmpDsoFuncs, nodesCloneNode(pFunc));
} else {
otherFunc = true;
......@@ -180,15 +179,15 @@ static int32_t osdGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs,
return TSDB_CODE_SUCCESS;
}
static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo* pInfo) {
pInfo->pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, osdMayBeOptimized);
static int32_t scanPathOptMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo* pInfo) {
pInfo->pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, scanPathOptMayBeOptimized);
if (NULL == pInfo->pScan) {
return TSDB_CODE_SUCCESS;
}
return osdGetRelatedFuncs(pInfo->pScan, &pInfo->pSdrFuncs, &pInfo->pDsoFuncs);
return scanPathOptGetRelatedFuncs(pInfo->pScan, &pInfo->pSdrFuncs, &pInfo->pDsoFuncs);
}
static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l, EFuncDataRequired r) {
static EFuncDataRequired scanPathOptPromoteDataRequired(EFuncDataRequired l, EFuncDataRequired r) {
switch (l) {
case FUNC_DATA_REQUIRED_DATA_LOAD:
return l;
......@@ -202,19 +201,19 @@ static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l, EFuncDataRe
return r;
}
static int32_t osdGetDataRequired(SNodeList* pFuncs) {
static int32_t scanPathOptGetDataRequired(SNodeList* pFuncs) {
if (NULL == pFuncs) {
return FUNC_DATA_REQUIRED_DATA_LOAD;
}
EFuncDataRequired dataRequired = FUNC_DATA_REQUIRED_FILTEROUT;
SNode* pFunc = NULL;
FOREACH(pFunc, pFuncs) {
dataRequired = osdPromoteDataRequired(dataRequired, fmFuncDataRequired((SFunctionNode*)pFunc, NULL));
dataRequired = scanPathOptPromoteDataRequired(dataRequired, fmFuncDataRequired((SFunctionNode*)pFunc, NULL));
}
return dataRequired;
}
static void setScanWindowInfo(SScanLogicNode* pScan) {
static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
SLogicNode* pParent = pScan->node.pParent;
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && pParent->pParent &&
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
......@@ -233,23 +232,23 @@ static void setScanWindowInfo(SScanLogicNode* pScan) {
}
}
static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SOsdInfo info = {0};
int32_t code = osdMatch(pCxt, pLogicSubplan->pNode, &info);
int32_t code = scanPathOptMatch(pCxt, pLogicSubplan->pNode, &info);
if (TSDB_CODE_SUCCESS == code && info.pScan) {
setScanWindowInfo((SScanLogicNode*)info.pScan);
scanPathOptSetScanWin((SScanLogicNode*)info.pScan);
}
if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) {
info.pScan->dataRequired = osdGetDataRequired(info.pSdrFuncs);
info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs);
info.pScan->pDynamicScanFuncs = info.pDsoFuncs;
OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_OSD);
OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH);
pCxt->optimized = true;
}
nodesDestroyList(info.pSdrFuncs);
return code;
}
static int32_t cpdMergeCond(SNode** pDst, SNode** pSrc) {
static int32_t pushDownCondOptMergeCond(SNode** pDst, SNode** pSrc) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (NULL == pLogicCond) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -270,7 +269,7 @@ static int32_t cpdMergeCond(SNode** pDst, SNode** pSrc) {
return code;
}
static int32_t cpdCondAppend(SNode** pCond, SNode** pAdditionalCond) {
static int32_t pushDownCondOptAppendCond(SNode** pCond, SNode** pAdditionalCond) {
if (NULL == *pCond) {
TSWAP(*pCond, *pAdditionalCond);
return TSDB_CODE_SUCCESS;
......@@ -283,16 +282,16 @@ static int32_t cpdCondAppend(SNode** pCond, SNode** pAdditionalCond) {
*pAdditionalCond = NULL;
}
} else {
code = cpdMergeCond(pCond, pAdditionalCond);
code = pushDownCondOptMergeCond(pCond, pAdditionalCond);
}
return code;
}
static int32_t cpdCalcTimeRange(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pPrimaryKeyCond,
SNode** pOtherCond) {
static int32_t pushDownCondOptCalcTimeRange(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pPrimaryKeyCond,
SNode** pOtherCond) {
int32_t code = TSDB_CODE_SUCCESS;
if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) {
code = cpdCondAppend(pOtherCond, pPrimaryKeyCond);
code = pushDownCondOptAppendCond(pOtherCond, pPrimaryKeyCond);
} else {
bool isStrict = false;
code = filterGetTimeRange(*pPrimaryKeyCond, &pScan->scanRange, &isStrict);
......@@ -300,7 +299,7 @@ static int32_t cpdCalcTimeRange(SOptimizeContext* pCxt, SScanLogicNode* pScan, S
if (isStrict) {
nodesDestroyNode(*pPrimaryKeyCond);
} else {
code = cpdCondAppend(pOtherCond, pPrimaryKeyCond);
code = pushDownCondOptAppendCond(pOtherCond, pPrimaryKeyCond);
}
*pPrimaryKeyCond = NULL;
}
......@@ -308,8 +307,9 @@ static int32_t cpdCalcTimeRange(SOptimizeContext* pCxt, SScanLogicNode* pScan, S
return code;
}
static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD) ||
static int32_t pushDownCondOptDealScan(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
if (NULL == pScan->node.pConditions ||
OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE) ||
TSDB_SYSTEM_TABLE == pScan->tableType) {
return TSDB_CODE_SUCCESS;
}
......@@ -319,14 +319,14 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
int32_t code = nodesPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond, &pScan->pTagCond,
&pOtherCond);
if (TSDB_CODE_SUCCESS == code && NULL != pPrimaryKeyCond) {
code = cpdCalcTimeRange(pCxt, pScan, &pPrimaryKeyCond, &pOtherCond);
code = pushDownCondOptCalcTimeRange(pCxt, pScan, &pPrimaryKeyCond, &pOtherCond);
}
if (TSDB_CODE_SUCCESS == code) {
pScan->node.pConditions = pOtherCond;
}
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD);
OPTIMIZE_FLAG_SET_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true;
} else {
nodesDestroyNode(pPrimaryKeyCond);
......@@ -336,7 +336,7 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
return code;
}
static bool cpdBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
static bool pushDownCondOptBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
SNode* pTableCol = NULL;
FOREACH(pTableCol, pTableCols) {
if (nodesEqualNode(pCondCol, pTableCol)) {
......@@ -346,12 +346,12 @@ static bool cpdBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
return false;
}
static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) {
static EDealRes pushDownCondOptIsCrossTableCond(SNode* pNode, void* pContext) {
SCpdIsMultiTableCondCxt* pCxt = pContext;
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
if (cpdBelongThisTable(pNode, pCxt->pLeftCols)) {
if (pushDownCondOptBelongThisTable(pNode, pCxt->pLeftCols)) {
pCxt->havaLeftCol = true;
} else if (cpdBelongThisTable(pNode, pCxt->pRightCols)) {
} else if (pushDownCondOptBelongThisTable(pNode, pCxt->pRightCols)) {
pCxt->haveRightCol = true;
}
return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE;
......@@ -359,10 +359,11 @@ static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static ECondAction cpdCondAction(EJoinType joinType, SNodeList* pLeftCols, SNodeList* pRightCols, SNode* pNode) {
static ECondAction pushDownCondOptGetCondAction(EJoinType joinType, SNodeList* pLeftCols, SNodeList* pRightCols,
SNode* pNode) {
SCpdIsMultiTableCondCxt cxt = {
.pLeftCols = pLeftCols, .pRightCols = pRightCols, .havaLeftCol = false, .haveRightCol = false};
nodesWalkExpr(pNode, cpdIsMultiTableCondImpl, &cxt);
nodesWalkExpr(pNode, pushDownCondOptIsCrossTableCond, &cxt);
return (JOIN_TYPE_INNER != joinType
? COND_ACTION_STAY
: (cxt.havaLeftCol && cxt.haveRightCol
......@@ -370,8 +371,8 @@ static ECondAction cpdCondAction(EJoinType joinType, SNodeList* pLeftCols, SNode
: (cxt.havaLeftCol ? COND_ACTION_PUSH_LEFT_CHILD : COND_ACTION_PUSH_RIGHT_CHILD)));
}
static int32_t cpdPartitionLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond,
SNode** pRightChildCond) {
static int32_t pushDownCondOptPartLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond,
SNode** pRightChildCond) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pJoin->node.pConditions;
if (LOGIC_COND_TYPE_AND != pLogicCond->condType) {
return TSDB_CODE_SUCCESS;
......@@ -387,7 +388,7 @@ static int32_t cpdPartitionLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNo
SNodeList* pRemainConds = NULL;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pCond);
ECondAction condAction = pushDownCondOptGetCondAction(pJoin->joinType, pLeftCols, pRightCols, pCond);
if (COND_ACTION_PUSH_JOIN == condAction) {
code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond));
} else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) {
......@@ -439,11 +440,12 @@ static int32_t cpdPartitionLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNo
return code;
}
static int32_t cpdPartitionOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond,
SNode** pRightChildCond) {
static int32_t pushDownCondOptPartOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond,
SNode** pRightChildCond) {
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pJoin->node.pConditions);
ECondAction condAction =
pushDownCondOptGetCondAction(pJoin->joinType, pLeftCols, pRightCols, pJoin->node.pConditions);
if (COND_ACTION_STAY == condAction) {
return TSDB_CODE_SUCCESS;
} else if (COND_ACTION_PUSH_JOIN == condAction) {
......@@ -457,35 +459,35 @@ static int32_t cpdPartitionOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode*
return TSDB_CODE_SUCCESS;
}
static int32_t cpdPartitionCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond,
SNode** pRightChildCond) {
static int32_t pushDownCondOptPartCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond,
SNode** pRightChildCond) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->node.pConditions)) {
return cpdPartitionLogicCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond);
return pushDownCondOptPartLogicCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond);
} else {
return cpdPartitionOpCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond);
return pushDownCondOptPartOpCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond);
}
}
static int32_t cpdPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
return cpdCondAppend(&pJoin->pOnConditions, pCond);
static int32_t pushDownCondOptPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
return pushDownCondOptAppendCond(&pJoin->pOnConditions, pCond);
}
static int32_t cpdPushCondToScan(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pCond) {
return cpdCondAppend(&pScan->node.pConditions, pCond);
static int32_t pushDownCondOptPushCondToScan(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pCond) {
return pushDownCondOptAppendCond(&pScan->node.pConditions, pCond);
}
static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
switch (nodeType(pChild)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
return cpdPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond);
return pushDownCondOptPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond);
default:
break;
}
planError("cpdPushCondToChild failed, invalid logic plan node %s", nodesNodeName(nodeType(pChild)));
planError("pushDownCondOptPushCondToChild failed, invalid logic plan node %s", nodesNodeName(nodeType(pChild)));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static bool cpdIsPrimaryKey(SNode* pNode, SNodeList* pTableCols) {
static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
......@@ -493,10 +495,10 @@ static bool cpdIsPrimaryKey(SNode* pNode, SNodeList* pTableCols) {
if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
return false;
}
return cpdBelongThisTable(pNode, pTableCols);
return pushDownCondOptBelongThisTable(pNode, pTableCols);
}
static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
static bool pushDownCondOptIsPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
if (QUERY_NODE_OPERATOR != nodeType(pCond)) {
return false;
}
......@@ -508,15 +510,15 @@ static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
if (cpdIsPrimaryKey(pOper->pLeft, pLeftCols)) {
return cpdIsPrimaryKey(pOper->pRight, pRightCols);
} else if (cpdIsPrimaryKey(pOper->pLeft, pRightCols)) {
return cpdIsPrimaryKey(pOper->pRight, pLeftCols);
if (pushDownCondOptIsPriKey(pOper->pLeft, pLeftCols)) {
return pushDownCondOptIsPriKey(pOper->pRight, pRightCols);
} else if (pushDownCondOptIsPriKey(pOper->pLeft, pRightCols)) {
return pushDownCondOptIsPriKey(pOper->pRight, pLeftCols);
}
return false;
}
static bool cpdContainPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pCond)) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pCond;
if (LOGIC_COND_TYPE_AND != pLogicCond->condType) {
......@@ -525,54 +527,56 @@ static bool cpdContainPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
bool hasPrimaryKeyEqualCond = false;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
if (cpdContainPrimaryKeyEqualCond(pJoin, pCond)) {
if (pushDownCondOptContainPriKeyEqualCond(pJoin, pCond)) {
hasPrimaryKeyEqualCond = true;
break;
}
}
return hasPrimaryKeyEqualCond;
} else {
return cpdIsPrimaryKeyEqualCond(pJoin, pCond);
return pushDownCondOptIsPriKeyEqualCond(pJoin, pCond);
}
}
static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->pOnConditions) {
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
}
if (!cpdContainPrimaryKeyEqualCond(pJoin, pJoin->pOnConditions)) {
if (!pushDownCondOptContainPriKeyEqualCond(pJoin, pJoin->pOnConditions)) {
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
}
return TSDB_CODE_SUCCESS;
}
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) {
static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pJoin->node.pConditions) {
return cpdCheckJoinOnCond(pCxt, pJoin);
return pushDownCondOptCheckJoinOnCond(pCxt, pJoin);
}
SNode* pOnCond = NULL;
SNode* pLeftChildCond = NULL;
SNode* pRightChildCond = NULL;
int32_t code = cpdPartitionCond(pJoin, &pOnCond, &pLeftChildCond, &pRightChildCond);
int32_t code = pushDownCondOptPartCond(pJoin, &pOnCond, &pLeftChildCond, &pRightChildCond);
if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
code = cpdPushCondToOnCond(pCxt, pJoin, &pOnCond);
code = pushDownCondOptPushCondToOnCond(pCxt, pJoin, &pOnCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) {
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
code =
pushDownCondOptPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) {
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
code =
pushDownCondOptPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
}
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD);
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true;
code = cpdCheckJoinOnCond(pCxt, pJoin);
code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin);
} else {
nodesDestroyNode(pOnCond);
nodesDestroyNode(pLeftChildCond);
......@@ -582,22 +586,22 @@ static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoi
return code;
}
static int32_t cpdPushAggCondition(SOptimizeContext* pCxt, SAggLogicNode* pAgg) {
static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) {
// todo
return TSDB_CODE_SUCCESS;
}
static int32_t cpdPushCondition(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pLogicNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
code = cpdOptimizeScanCondition(pCxt, (SScanLogicNode*)pLogicNode);
code = pushDownCondOptDealScan(pCxt, (SScanLogicNode*)pLogicNode);
break;
case QUERY_NODE_LOGIC_PLAN_JOIN:
code = cpdPushJoinCondition(pCxt, (SJoinLogicNode*)pLogicNode);
code = pushDownCondOptDealJoin(pCxt, (SJoinLogicNode*)pLogicNode);
break;
case QUERY_NODE_LOGIC_PLAN_AGG:
code = cpdPushAggCondition(pCxt, (SAggLogicNode*)pLogicNode);
code = pushDownCondOptDealAgg(pCxt, (SAggLogicNode*)pLogicNode);
break;
default:
break;
......@@ -605,7 +609,7 @@ static int32_t cpdPushCondition(SOptimizeContext* pCxt, SLogicNode* pLogicNode)
if (TSDB_CODE_SUCCESS == code) {
SNode* pChild = NULL;
FOREACH(pChild, pLogicNode->pChildren) {
code = cpdPushCondition(pCxt, (SLogicNode*)pChild);
code = pushDownCondOptimizeImpl(pCxt, (SLogicNode*)pChild);
if (TSDB_CODE_SUCCESS != code) {
break;
}
......@@ -614,11 +618,11 @@ static int32_t cpdPushCondition(SOptimizeContext* pCxt, SLogicNode* pLogicNode)
return code;
}
static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
return cpdPushCondition(pCxt, pLogicSubplan->pNode);
static int32_t pushDownCondOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
return pushDownCondOptimizeImpl(pCxt, pLogicSubplan->pNode);
}
static bool opkIsPrimaryKeyOrderBy(SNodeList* pSortKeys) {
static bool sortPriKeyOptIsPriKeyOrderBy(SNodeList* pSortKeys) {
if (1 != LIST_LENGTH(pSortKeys)) {
return false;
}
......@@ -626,17 +630,18 @@ static bool opkIsPrimaryKeyOrderBy(SNodeList* pSortKeys) {
return (QUERY_NODE_COLUMN == nodeType(pNode) ? (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) : false);
}
static bool opkSortMayBeOptimized(SLogicNode* pNode) {
static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SORT != nodeType(pNode)) {
return false;
}
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OPK)) {
return false;
SSortLogicNode* pSort = (SSortLogicNode*)pNode;
if (pSort->groupSort || !sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) {
return TSDB_CODE_SUCCESS;
}
return true;
}
static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) {
static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
......@@ -646,9 +651,11 @@ static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeL
}
break;
case QUERY_NODE_LOGIC_PLAN_JOIN:
code = opkGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
code =
sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
if (TSDB_CODE_SUCCESS == code) {
code = opkGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes);
code =
sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes);
}
return code;
case QUERY_NODE_LOGIC_PLAN_AGG:
......@@ -663,77 +670,58 @@ static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeL
return TSDB_CODE_SUCCESS;
}
return opkGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
return sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
}
static int32_t opkGetScanNodes(SLogicNode* pNode, SNodeList** pScanNodes) {
static int32_t sortPriKeyOptGetScanNodes(SLogicNode* pNode, SNodeList** pScanNodes) {
bool notOptimize = false;
int32_t code = opkGetScanNodesImpl(pNode, &notOptimize, pScanNodes);
int32_t code = sortPriKeyOptGetScanNodesImpl(pNode, &notOptimize, pScanNodes);
if (TSDB_CODE_SUCCESS != code || notOptimize) {
nodesClearList(*pScanNodes);
}
return code;
}
static EOrder opkGetPrimaryKeyOrder(SSortLogicNode* pSort) {
static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) {
return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order;
}
static SNode* opkRewriteDownNode(SSortLogicNode* pSort) {
SNode* pDownNode = nodesListGetNode(pSort->node.pChildren, 0);
// todo
NODES_CLEAR_LIST(pSort->node.pChildren);
return pDownNode;
}
static int32_t opkDoOptimized(SOptimizeContext* pCxt, SSortLogicNode* pSort, SNodeList* pScanNodes) {
EOrder order = opkGetPrimaryKeyOrder(pSort);
static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort,
SNodeList* pScanNodes) {
EOrder order = sortPriKeyOptGetPriKeyOrder(pSort);
if (ORDER_DESC == order) {
SNode* pScan = NULL;
FOREACH(pScan, pScanNodes) { TSWAP(((SScanLogicNode*)pScan)->scanSeq[0], ((SScanLogicNode*)pScan)->scanSeq[1]); }
}
if (NULL == pSort->node.pParent) {
// todo
return TSDB_CODE_SUCCESS;
}
SNode* pDownNode = opkRewriteDownNode(pSort);
SNode* pNode;
FOREACH(pNode, pSort->node.pParent->pChildren) {
if (nodesEqualNode(pNode, (SNode*)pSort)) {
REPLACE_NODE(pDownNode);
((SLogicNode*)pDownNode)->pParent = pSort->node.pParent;
break;
}
int32_t code =
replaceLogicNode(pLogicSubplan, (SLogicNode*)pSort, (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0));
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pSort->node.pChildren);
nodesDestroyNode((SNode*)pSort);
}
nodesDestroyNode((SNode*)pSort);
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t opkOptimizeImpl(SOptimizeContext* pCxt, SSortLogicNode* pSort) {
OPTIMIZE_FLAG_SET_MASK(pSort->node.optimizedFlag, OPTIMIZE_FLAG_OPK);
if (!opkIsPrimaryKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) {
return TSDB_CODE_SUCCESS;
}
static int32_t sortPrimaryKeyOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort) {
SNodeList* pScanNodes = NULL;
int32_t code = opkGetScanNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), &pScanNodes);
int32_t code = sortPriKeyOptGetScanNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), &pScanNodes);
if (TSDB_CODE_SUCCESS == code && NULL != pScanNodes) {
code = opkDoOptimized(pCxt, pSort, pScanNodes);
code = sortPriKeyOptApply(pCxt, pLogicSubplan, pSort, pScanNodes);
}
nodesClearList(pScanNodes);
return code;
}
static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, opkSortMayBeOptimized);
static int32_t sortPrimaryKeyOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, sortPriKeyOptMayBeOptimized);
if (NULL == pSort) {
return TSDB_CODE_SUCCESS;
}
return opkOptimizeImpl(pCxt, pSort);
return sortPrimaryKeyOptimizeImpl(pCxt, pLogicSubplan, pSort);
}
static bool smaOptMayBeOptimized(SLogicNode* pNode) {
static bool smaIndexOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent ||
QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) ||
WINDOW_TYPE_INTERVAL != ((SWindowLogicNode*)pNode->pParent)->winType) {
......@@ -748,7 +736,8 @@ static bool smaOptMayBeOptimized(SLogicNode* pNode) {
return true;
}
static int32_t smaOptCreateMerge(SLogicNode* pChild, SNodeList* pMergeKeys, SNodeList* pTargets, SLogicNode** pOutput) {
static int32_t smaIndexOptCreateMerge(SLogicNode* pChild, SNodeList* pMergeKeys, SNodeList* pTargets,
SLogicNode** pOutput) {
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -767,8 +756,8 @@ static int32_t smaOptCreateMerge(SLogicNode* pChild, SNodeList* pMergeKeys, SNod
return TSDB_CODE_SUCCESS;
}
static int32_t smaOptRecombinationNode(SLogicSubplan* pLogicSubplan, SLogicNode* pInterval, SLogicNode* pMerge,
SLogicNode* pSmaScan) {
static int32_t smaIndexOptRecombinationNode(SLogicSubplan* pLogicSubplan, SLogicNode* pInterval, SLogicNode* pMerge,
SLogicNode* pSmaScan) {
int32_t code = nodesListMakeAppend(&pMerge->pChildren, (SNode*)pInterval);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&pMerge->pChildren, (SNode*)pSmaScan);
......@@ -781,8 +770,8 @@ static int32_t smaOptRecombinationNode(SLogicSubplan* pLogicSubplan, SLogicNode*
return code;
}
static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList* pCols,
SLogicNode** pOutput) {
static int32_t smaIndexOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList* pCols,
SLogicNode** pOutput) {
SScanLogicNode* pSmaScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
if (NULL == pSmaScan) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -811,7 +800,7 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde
return TSDB_CODE_SUCCESS;
}
static bool smaOptEqualInterval(SScanLogicNode* pScan, SWindowLogicNode* pWindow, STableIndexInfo* pIndex) {
static bool smaIndexOptEqualInterval(SScanLogicNode* pScan, SWindowLogicNode* pWindow, STableIndexInfo* pIndex) {
if (pWindow->interval != pIndex->interval || pWindow->intervalUnit != pIndex->intervalUnit ||
pWindow->offset != pIndex->offset || pWindow->sliding != pIndex->sliding ||
pWindow->slidingUnit != pIndex->slidingUnit) {
......@@ -831,7 +820,7 @@ static bool smaOptEqualInterval(SScanLogicNode* pScan, SWindowLogicNode* pWindow
return true;
}
static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) {
static SNode* smaIndexOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
......@@ -846,7 +835,7 @@ static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId)
return (SNode*)pCol;
}
static int32_t smaOptFindSmaFunc(SNode* pQueryFunc, SNodeList* pSmaFuncs) {
static int32_t smaIndexOptFindSmaFunc(SNode* pQueryFunc, SNodeList* pSmaFuncs) {
int32_t index = 0;
SNode* pSmaFunc = NULL;
FOREACH(pSmaFunc, pSmaFuncs) {
......@@ -858,8 +847,8 @@ static int32_t smaOptFindSmaFunc(SNode* pQueryFunc, SNodeList* pSmaFuncs) {
return -1;
}
static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeList* pSmaFuncs, SNodeList** pOutput,
int32_t* pWStrartIndex) {
static int32_t smaIndexOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeList* pSmaFuncs, SNodeList** pOutput,
int32_t* pWStrartIndex) {
SNodeList* pCols = NULL;
SNode* pFunc = NULL;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -870,11 +859,11 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis
if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) {
*pWStrartIndex = index;
}
smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs);
smaFuncIndex = smaIndexOptFindSmaFunc(pFunc, pSmaFuncs);
if (smaFuncIndex < 0) {
break;
} else {
code = nodesListMakeStrictAppend(&pCols, smaOptCreateSmaCol(pFunc, tableId, smaFuncIndex + 2));
code = nodesListMakeStrictAppend(&pCols, smaIndexOptCreateSmaCol(pFunc, tableId, smaFuncIndex + 2));
if (TSDB_CODE_SUCCESS != code) {
break;
}
......@@ -891,22 +880,22 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis
return code;
}
static int32_t smaOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList** pCols,
int32_t* pWStrartIndex) {
static int32_t smaIndexOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList** pCols,
int32_t* pWStrartIndex) {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pScan->node.pParent;
if (!smaOptEqualInterval(pScan, pWindow, pIndex)) {
if (!smaIndexOptEqualInterval(pScan, pWindow, pIndex)) {
return TSDB_CODE_SUCCESS;
}
SNodeList* pSmaFuncs = NULL;
int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = smaOptCreateSmaCols(pWindow->pFuncs, pIndex->dstTbUid, pSmaFuncs, pCols, pWStrartIndex);
code = smaIndexOptCreateSmaCols(pWindow->pFuncs, pIndex->dstTbUid, pSmaFuncs, pCols, pWStrartIndex);
}
nodesDestroyList(pSmaFuncs);
return code;
}
static SNode* smaOptCreateWStartTs() {
static SNode* smaIndexOptCreateWStartTs() {
SFunctionNode* pWStart = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pWStart) {
return NULL;
......@@ -920,7 +909,7 @@ static SNode* smaOptCreateWStartTs() {
return (SNode*)pWStart;
}
static int32_t smaOptCreateMergeKey(SNode* pCol, SNodeList** pMergeKeys) {
static int32_t smaIndexOptCreateMergeKey(SNode* pCol, SNodeList** pMergeKeys) {
SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pMergeKey) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -935,9 +924,9 @@ static int32_t smaOptCreateMergeKey(SNode* pCol, SNodeList** pMergeKeys) {
return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
}
static int32_t smaOptRewriteInterval(SWindowLogicNode* pInterval, int32_t wstrartIndex, SNodeList** pMergeKeys) {
static int32_t smaIndexOptRewriteInterval(SWindowLogicNode* pInterval, int32_t wstrartIndex, SNodeList** pMergeKeys) {
if (wstrartIndex < 0) {
SNode* pWStart = smaOptCreateWStartTs();
SNode* pWStart = smaIndexOptCreateWStartTs();
if (NULL == pWStart) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -948,11 +937,11 @@ static int32_t smaOptRewriteInterval(SWindowLogicNode* pInterval, int32_t wstrar
}
wstrartIndex = LIST_LENGTH(pInterval->node.pTargets) - 1;
}
return smaOptCreateMergeKey(nodesListGetNode(pInterval->node.pTargets, wstrartIndex), pMergeKeys);
return smaIndexOptCreateMergeKey(nodesListGetNode(pInterval->node.pTargets, wstrartIndex), pMergeKeys);
}
static int32_t smaOptApplyIndexExt(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
SNodeList* pSmaCols, int32_t wstrartIndex) {
static int32_t smaIndexOptApplyIndexExt(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
SNodeList* pSmaCols, int32_t wstrartIndex) {
SWindowLogicNode* pInterval = (SWindowLogicNode*)pScan->node.pParent;
SNodeList* pMergeTargets = nodesCloneList(pInterval->node.pTargets);
if (NULL == pMergeTargets) {
......@@ -961,42 +950,42 @@ static int32_t smaOptApplyIndexExt(SLogicSubplan* pLogicSubplan, SScanLogicNode*
SLogicNode* pSmaScan = NULL;
SLogicNode* pMerge = NULL;
SNodeList* pMergeKeys = NULL;
int32_t code = smaOptRewriteInterval(pInterval, wstrartIndex, &pMergeKeys);
int32_t code = smaIndexOptRewriteInterval(pInterval, wstrartIndex, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = smaOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan);
code = smaIndexOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan);
}
if (TSDB_CODE_SUCCESS == code) {
code = smaOptCreateMerge(pScan->node.pParent, pMergeKeys, pMergeTargets, &pMerge);
code = smaIndexOptCreateMerge(pScan->node.pParent, pMergeKeys, pMergeTargets, &pMerge);
}
if (TSDB_CODE_SUCCESS == code) {
code = smaOptRecombinationNode(pLogicSubplan, pScan->node.pParent, pMerge, pSmaScan);
code = smaIndexOptRecombinationNode(pLogicSubplan, pScan->node.pParent, pMerge, pSmaScan);
}
return code;
}
static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
SNodeList* pSmaCols, int32_t wstrartIndex) {
static int32_t smaIndexOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
SNodeList* pSmaCols, int32_t wstrartIndex) {
SLogicNode* pSmaScan = NULL;
int32_t code = smaOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan);
int32_t code = smaIndexOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan);
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, pScan->node.pParent, pSmaScan);
}
return code;
}
static void smaOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); }
static void smaIndexOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); }
static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) {
static int32_t smaIndexOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t nindexes = taosArrayGetSize(pScan->pSmaIndexes);
for (int32_t i = 0; i < nindexes; ++i) {
STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i);
SNodeList* pSmaCols = NULL;
int32_t wstrartIndex = -1;
code = smaOptCouldApplyIndex(pScan, pIndex, &pSmaCols, &wstrartIndex);
code = smaIndexOptCouldApplyIndex(pScan, pIndex, &pSmaCols, &wstrartIndex);
if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) {
code = smaOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols, wstrartIndex);
taosArrayDestroyEx(pScan->pSmaIndexes, smaOptDestroySmaIndex);
code = smaIndexOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols, wstrartIndex);
taosArrayDestroyEx(pScan->pSmaIndexes, smaIndexOptDestroySmaIndex);
pScan->pSmaIndexes = NULL;
break;
}
......@@ -1004,12 +993,12 @@ static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
return code;
}
static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, smaOptMayBeOptimized);
static int32_t smaIndexOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, smaIndexOptMayBeOptimized);
if (NULL == pScan) {
return TSDB_CODE_SUCCESS;
}
return smaOptimizeImpl(pCxt, pLogicSubplan, pScan);
return smaIndexOptimizeImpl(pCxt, pLogicSubplan, pScan);
}
static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
......@@ -1084,7 +1073,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
int32_t code = TSDB_CODE_SUCCESS;
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pPartTags);
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags);
int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan);
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pNode->pChildren);
......@@ -1094,7 +1083,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
SNode* pGroupKey = NULL;
FOREACH(pGroupKey, ((SAggLogicNode*)pNode)->pGroupKeys) {
code = nodesListMakeStrictAppend(
&pScan->pPartTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0)));
&pScan->pGroupTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0)));
if (TSDB_CODE_SUCCESS != code) {
break;
}
......@@ -1102,13 +1091,11 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
NODES_DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = partTagsOptRebuildTbanme(pScan->pPartTags);
code = partTagsOptRebuildTbanme(pScan->pGroupTags);
}
return code;
}
//=====================================================================================================================
// eliminate project optimization
static bool eliminateProjOptCheckProjColumnNames(SProjectLogicNode* pProjectNode) {
SHashObj* pProjColNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SNode* pProjection;
......@@ -1142,7 +1129,7 @@ static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) {
return false;
}
SNode* pProjection;
SNode* pProjection;
FOREACH(pProjection, pProjectNode->pProjections) {
SExprNode* pExprNode = (SExprNode*)pProjection;
if (QUERY_NODE_COLUMN != nodeType(pExprNode)) {
......@@ -1190,31 +1177,169 @@ static int32_t eliminateProjOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
return eliminateProjOptimizeImpl(pCxt, pLogicSubplan, pProjectNode);
}
//=====================================================================================================================
// eliminate Set Operator optimization
static bool rewriteTailOptMayBeOptimized(SLogicNode* pNode) {
return QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC == nodeType(pNode) && ((SIndefRowsFuncLogicNode*)pNode)->isTailFunc;
}
static SNode* rewriteTailOptCreateOrderByExpr(SNode* pSortKey) {
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pOrder) {
return NULL;
}
pOrder->order = ORDER_DESC;
pOrder->pExpr = nodesCloneNode(pSortKey);
if (NULL == pOrder->pExpr) {
nodesDestroyNode((SNode*)pOrder);
return NULL;
}
return (SNode*)pOrder;
}
static int32_t rewriteTailOptCreateLimit(SNode* pLimit, SNode* pOffset, SNode** pOutput) {
SLimitNode* pLimitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT);
if (NULL == pLimitNode) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pLimitNode->limit = NULL == pLimit ? -1 : ((SValueNode*)pLimit)->datum.i;
pLimitNode->offset = NULL == pOffset ? -1 : ((SValueNode*)pOffset)->datum.i;
*pOutput = (SNode*)pLimitNode;
return TSDB_CODE_SUCCESS;
}
static bool rewriteTailOptNeedGroupSort(SIndefRowsFuncLogicNode* pIndef) {
if (1 != LIST_LENGTH(pIndef->node.pChildren)) {
return false;
}
SNode* pChild = nodesListGetNode(pIndef->node.pChildren, 0);
return QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild) ||
(QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && NULL != ((SScanLogicNode*)pChild)->pGroupTags);
}
static int32_t rewriteTailOptCreateSort(SIndefRowsFuncLogicNode* pIndef, SLogicNode** pOutput) {
SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT);
if (NULL == pSort) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->groupSort = rewriteTailOptNeedGroupSort(pIndef);
TSWAP(pSort->node.pChildren, pIndef->node.pChildren);
pSort->node.precision = pIndef->node.precision;
// tail(expr, [limit, offset,] _rowts)
SFunctionNode* pTail = (SFunctionNode*)nodesListGetNode(pIndef->pFuncs, 0);
int32_t rowtsIndex = LIST_LENGTH(pTail->pParameterList) - 1;
int32_t code = nodesListMakeStrictAppend(
&pSort->pSortKeys, rewriteTailOptCreateOrderByExpr(nodesListGetNode(pTail->pParameterList, rowtsIndex)));
if (TSDB_CODE_SUCCESS == code) {
pSort->node.pTargets = nodesCloneList(((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0))->pTargets);
if (NULL == pSort->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = (SLogicNode*)pSort;
} else {
nodesDestroyNode((SNode*)pSort);
}
return code;
}
static SNode* rewriteTailOptCreateProjectExpr(SFunctionNode* pTail) {
SNode* pExpr = nodesCloneNode(nodesListGetNode(pTail->pParameterList, 0));
if (NULL == pExpr) {
return NULL;
}
strcpy(((SExprNode*)pExpr)->aliasName, pTail->node.aliasName);
return pExpr;
}
static int32_t rewriteTailOptCreateProject(SIndefRowsFuncLogicNode* pIndef, SLogicNode** pOutput) {
SProjectLogicNode* pProject = (SProjectLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_PROJECT);
if (NULL == pProject) {
return TSDB_CODE_OUT_OF_MEMORY;
}
TSWAP(pProject->node.pTargets, pIndef->node.pTargets);
pProject->node.precision = pIndef->node.precision;
// tail(expr, [limit, offset,] _rowts)
SFunctionNode* pTail = (SFunctionNode*)nodesListGetNode(pIndef->pFuncs, 0);
int32_t limitIndex = LIST_LENGTH(pTail->pParameterList) > 2 ? 1 : -1;
int32_t offsetIndex = LIST_LENGTH(pTail->pParameterList) > 3 ? 2 : -1;
int32_t code = nodesListMakeStrictAppend(&pProject->pProjections, rewriteTailOptCreateProjectExpr(pTail));
if (TSDB_CODE_SUCCESS == code) {
code = rewriteTailOptCreateLimit(limitIndex < 0 ? NULL : nodesListGetNode(pTail->pParameterList, limitIndex),
offsetIndex < 0 ? NULL : nodesListGetNode(pTail->pParameterList, offsetIndex),
&pProject->node.pLimit);
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = (SLogicNode*)pProject;
} else {
nodesDestroyNode((SNode*)pProject);
}
return code;
}
static int32_t rewriteTailOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan,
SIndefRowsFuncLogicNode* pIndef) {
SLogicNode* pSort = NULL;
SLogicNode* pProject = NULL;
int32_t code = rewriteTailOptCreateSort(pIndef, &pSort);
if (TSDB_CODE_SUCCESS == code) {
code = rewriteTailOptCreateProject(pIndef, &pProject);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&pProject->pChildren, (SNode*)pSort);
pSort->pParent = pProject;
pSort = NULL;
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pIndef, pProject);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pIndef);
} else {
nodesDestroyNode((SNode*)pSort);
nodesDestroyNode((SNode*)pProject);
}
return code;
}
static int32_t rewriteTailOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SIndefRowsFuncLogicNode* pIndef =
(SIndefRowsFuncLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, rewriteTailOptMayBeOptimized);
if (NULL == pIndef) {
return TSDB_CODE_SUCCESS;
}
return rewriteTailOptimizeImpl(pCxt, pLogicSubplan, pIndef);
}
static bool eliminateSetOpMayBeOptimized(SLogicNode* pNode) {
SLogicNode* pParent = pNode->pParent;
if (NULL == pParent ||
QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pParent) && QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pParent) ||
QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pParent) && QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pParent) ||
LIST_LENGTH(pParent->pChildren) < 2) {
return false;
}
if (nodeType(pNode) != nodeType(pNode->pParent) ||
LIST_LENGTH(pNode->pChildren) < 2) {
if (nodeType(pNode) != nodeType(pNode->pParent) || LIST_LENGTH(pNode->pChildren) < 2) {
return false;
}
return true;
}
static int32_t eliminateSetOpOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSetOpNode) {
static int32_t eliminateSetOpOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan,
SLogicNode* pSetOpNode) {
SNode* pSibling;
FOREACH(pSibling, pSetOpNode->pParent->pChildren) {
if (nodesEqualNode(pSibling, (SNode*)pSetOpNode)) {
SNode* pChild;
FOREACH(pChild, pSetOpNode->pChildren) {
((SLogicNode*)pChild)->pParent = pSetOpNode->pParent;
}
FOREACH(pChild, pSetOpNode->pChildren) { ((SLogicNode*)pChild)->pParent = pSetOpNode->pParent; }
INSERT_LIST(pSetOpNode->pParent->pChildren, pSetOpNode->pChildren);
pSetOpNode->pChildren = NULL;
......@@ -1237,13 +1362,14 @@ static int32_t eliminateSetOpOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLo
// clang-format off
static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaOptimize},
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
{.pName = "PushDownCondition", .optimizeFunc = pushDownCondOptimize},
{.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize},
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize}
};
// clang-format on
......
......@@ -41,8 +41,12 @@ typedef struct SPhysiPlanContext {
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SColumnNode* pCol = (SColumnNode*)pNode;
if (NULL != pStmtName && '\0' != pStmtName[0]) {
return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
if (NULL != pStmtName) {
if ('\0' != pStmtName[0]) {
return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
} else {
return sprintf(pKey, "%s", pCol->node.aliasName);
}
}
if ('\0' == pCol->tableAlias[0]) {
return sprintf(pKey, "%s", pCol->colName);
......@@ -56,11 +60,13 @@ static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId, bool output, bool reserve) {
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const char* pName, const SNode* pNode, int16_t slotId,
bool output, bool reserve) {
SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
if (NULL == pSlot) {
return NULL;
}
strcpy(pSlot->name, pName);
pSlot->slotId = slotId;
pSlot->dataType = ((SExprNode*)pNode)->resType;
pSlot->reserve = reserve;
......@@ -99,10 +105,8 @@ static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char
return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}
static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
int32_t len = getSlotKey(pNode, NULL, name);
return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash);
static int32_t putSlotToHash(const char* pName, int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
return putSlotToHashImpl(dataBlockId, slotId, pName, strlen(pName), pHash);
}
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
......@@ -131,9 +135,11 @@ static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SD
int16_t slotId = 0;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId, true, false));
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
getSlotKey(pNode, NULL, name);
code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pNode, slotId, true, false));
if (TSDB_CODE_SUCCESS == code) {
code = putSlotToHash(pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
code = putSlotToHash(name, pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
}
if (TSDB_CODE_SUCCESS == code) {
pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
......@@ -196,7 +202,8 @@ static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList,
int32_t len = getSlotKey(pExpr, pStmtName, name);
SSlotIndex* pIndex = taosHashGet(pHash, name, len);
if (NULL == pIndex) {
code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pExpr, nextSlotId, output, reserve));
code =
nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pExpr, nextSlotId, output, reserve));
if (TSDB_CODE_SUCCESS == code) {
code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
}
......@@ -513,12 +520,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
pTableScan->dataRequired = pScanLogicNode->dataRequired;
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags);
pTableScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
(NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) {
(NULL != pScanLogicNode->pGroupTags && NULL == pTableScan->pGroupTags)) {
nodesDestroyNode((SNode*)pTableScan);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTableScan->groupSort = pScanLogicNode->groupSort;
pTableScan->interval = pScanLogicNode->interval;
pTableScan->offset = pScanLogicNode->offset;
pTableScan->sliding = pScanLogicNode->sliding;
......@@ -1170,8 +1178,9 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
SPhysiNode** pPhyNode) {
SSortPhysiNode* pSort =
(SSortPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pSortLogicNode, QUERY_NODE_PHYSICAL_PLAN_SORT);
SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pSortLogicNode,
pSortLogicNode->groupSort ? QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT : QUERY_NODE_PHYSICAL_PLAN_SORT);
if (NULL == pSort) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -1185,7 +1194,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
code = pushdownDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
}
}
......@@ -1322,6 +1331,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
pMerge->groupSort = pMergeLogicNode->groupSort;
int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
......
......@@ -362,7 +362,7 @@ static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
}
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
SNodeList* pMergeKeys, SLogicNode* pPartChild) {
SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) {
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -371,6 +371,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
pMerge->srcGroupId = pCxt->groupId;
pMerge->node.precision = pPartChild->precision;
pMerge->pMergeKeys = pMergeKeys;
pMerge->groupSort = groupSort;
int32_t code = TSDB_CODE_SUCCESS;
pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
......@@ -430,7 +431,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow);
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
......@@ -497,12 +498,16 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
return code;
}
static void splSetTableScanType(SLogicNode* pNode, EScanType scanType) {
static void stbSplSetTableMergeScan(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
((SScanLogicNode*)pNode)->scanType = scanType;
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
if (NULL != pScan->pGroupTags) {
pScan->groupSort = true;
}
} else {
if (1 == LIST_LENGTH(pNode->pChildren)) {
splSetTableScanType((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), scanType);
stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
}
}
}
......@@ -515,7 +520,7 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild);
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -524,13 +529,10 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
}
if (TSDB_CODE_SUCCESS == code) {
splSetTableScanType(pChild, SCAN_TYPE_TABLE_MERGE);
++(pCxt->groupId);
}
if (TSDB_CODE_SUCCESS == code) {
stbSplSetTableMergeScan(pChild);
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
++(pCxt->groupId);
} else {
nodesDestroyList(pMergeKeys);
}
......@@ -560,7 +562,7 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pPartTags;
return ((SScanLogicNode*)pNode)->pGroupTags;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
} else {
......@@ -775,6 +777,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
pPartSort->node.pChildren = pChildren;
splSetParent((SLogicNode*)pPartSort);
pPartSort->pSortKeys = pSortKeys;
pPartSort->groupSort = pSort->groupSort;
code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
}
......@@ -789,12 +792,29 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
return code;
}
static void stbSplSetScanPartSort(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
if (NULL != pScan->pGroupTags) {
pScan->groupSort = true;
}
} else {
if (1 == LIST_LENGTH(pNode->pChildren)) {
stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
}
}
}
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartSort = NULL;
SNodeList* pMergeKeys = NULL;
bool groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort);
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort);
}
if (TSDB_CODE_SUCCESS == code && groupSort) {
stbSplSetScanPartSort(pPartSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
......@@ -829,7 +849,7 @@ static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pS
SNodeList* pMergeKeys = NULL;
int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan);
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, false);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pSubplan->pChildren,
......
......@@ -67,6 +67,14 @@ TEST_F(PlanBasicTest, tailFunc) {
run("SELECT TAIL(c1, 10) FROM t1");
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10");
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10 PARTITION BY c1");
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10 ORDER BY 1");
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10 LIMIT 5");
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10 PARTITION BY c1 LIMIT 5");
}
TEST_F(PlanBasicTest, interpFunc) {
......
......@@ -76,6 +76,7 @@ static void parseArg(int argc, char* argv[]) {
static struct option long_options[] = {
{"dump", optional_argument, NULL, 'd'},
{"skipSql", required_argument, NULL, 's'},
{"limitSql", required_argument, NULL, 'i'},
{"log", required_argument, NULL, 'l'},
{0, 0, 0, 0}
};
......@@ -88,6 +89,9 @@ static void parseArg(int argc, char* argv[]) {
case 's':
setSkipSqlNum(optarg);
break;
case 'i':
setLimitSqlNum(optarg);
break;
case 'l':
setLogLevel(optarg);
break;
......
......@@ -51,6 +51,7 @@ enum DumpModule {
DumpModule g_dumpModule = DUMP_MODULE_NOTHING;
int32_t g_skipSql = 0;
int32_t g_limitSql = 0;
int32_t g_logLevel = 131;
void setDumpModule(const char* pModule) {
......@@ -76,28 +77,33 @@ void setDumpModule(const char* pModule) {
}
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); }
void setLimitSqlNum(const char* pNum) { g_limitSql = stoi(pNum); }
void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); }
int32_t getLogLevel() { return g_logLevel; }
class PlannerTestBaseImpl {
public:
PlannerTestBaseImpl() : sqlNo_(0) {}
PlannerTestBaseImpl() : sqlNo_(0), sqlNum_(0) {}
void useDb(const string& user, const string& db) {
caseEnv_.acctId_ = 0;
caseEnv_.user_ = user;
caseEnv_.db_ = db;
caseEnv_.nsql_ = g_skipSql;
caseEnv_.numOfSkipSql_ = g_skipSql;
caseEnv_.numOfLimitSql_ = g_limitSql;
}
void run(const string& sql) {
++sqlNo_;
if (caseEnv_.nsql_ > 0) {
--(caseEnv_.nsql_);
if (caseEnv_.numOfSkipSql_ > 0) {
--(caseEnv_.numOfSkipSql_);
return;
}
if (caseEnv_.numOfLimitSql_ > 0 && caseEnv_.numOfLimitSql_ == sqlNum_) {
return;
}
++sqlNum_;
reset();
try {
......@@ -134,7 +140,7 @@ class PlannerTestBaseImpl {
}
void prepare(const string& sql) {
if (caseEnv_.nsql_ > 0) {
if (caseEnv_.numOfSkipSql_ > 0) {
return;
}
......@@ -148,7 +154,7 @@ class PlannerTestBaseImpl {
}
void bindParams(TAOS_MULTI_BIND* pParams, int32_t colIdx) {
if (caseEnv_.nsql_ > 0) {
if (caseEnv_.numOfSkipSql_ > 0) {
return;
}
......@@ -161,8 +167,8 @@ class PlannerTestBaseImpl {
}
void exec() {
if (caseEnv_.nsql_ > 0) {
--(caseEnv_.nsql_);
if (caseEnv_.numOfSkipSql_ > 0) {
--(caseEnv_.numOfSkipSql_);
return;
}
......@@ -197,9 +203,10 @@ class PlannerTestBaseImpl {
int32_t acctId_;
string user_;
string db_;
int32_t nsql_;
int32_t numOfSkipSql_;
int32_t numOfLimitSql_;
caseEnv() : nsql_(0) {}
caseEnv() : numOfSkipSql_(0) {}
};
struct stmtEnv {
......@@ -401,6 +408,7 @@ class PlannerTestBaseImpl {
stmtEnv stmtEnv_;
stmtRes res_;
int32_t sqlNo_;
int32_t sqlNum_;
};
PlannerTestBase::PlannerTestBase() : impl_(new PlannerTestBaseImpl()) {}
......
......@@ -43,6 +43,7 @@ class PlannerTestBase : public testing::Test {
extern void setDumpModule(const char* pModule);
extern void setSkipSqlNum(const char* pNum);
extern void setLimitSqlNum(const char* pNum);
extern void setLogLevel(const char* pLogLevel);
extern int32_t getLogLevel();
......
......@@ -188,8 +188,8 @@ class TDTestCase:
def check_tail_table(self , tbname , col_name , tail_rows , offset):
tail_sql = f"select tail({col_name} , {tail_rows} , {offset}) from {tbname}"
equal_sql = f"select {col_name} from (select ts , {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}) order by ts"
#equal_sql = f"select {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}"
#equal_sql = f"select {col_name} from (select ts , {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}) order by ts"
equal_sql = f"select {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}"
tdSql.query(tail_sql)
tail_result = tdSql.queryResult
......@@ -404,7 +404,7 @@ class TDTestCase:
f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.query("select tail(c2,2) from sub1_bound")
tdSql.query("select tail(c2,2) from sub1_bound order by 1 desc")
tdSql.checkRows(2)
tdSql.checkData(0,0,9223372036854775803)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册