提交 e8e68fab 编写于 作者: X Xiaoyu Wang

feat: physical plan add clone interface

上级 c2477b4e
......@@ -207,8 +207,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
......@@ -218,11 +218,11 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION,
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE,
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
......
......@@ -133,12 +133,12 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = pPrjNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_JOIN: {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode;
pPhysiChildren = pJoinNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_AGG: {
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
pPhysiChildren = pAggNode->node.pChildren;
break;
......@@ -158,12 +158,12 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = pIntNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: {
SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode;
pPhysiChildren = pSessNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: {
SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode;
pPhysiChildren = pStateNode->window.node.pChildren;
break;
......@@ -513,7 +513,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_JOIN: {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
......@@ -553,7 +553,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_AGG: {
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
......@@ -744,7 +744,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: {
SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
......@@ -782,7 +782,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: {
SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_STATE_WINDOW_FORMAT,
......
......@@ -3936,7 +3936,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
......@@ -4582,7 +4582,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit,
pProjPhyNode->node.pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
......@@ -4662,7 +4662,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
......@@ -4674,7 +4674,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == type) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
......@@ -4694,7 +4694,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) {
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
......@@ -4706,9 +4706,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW == type) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
......@@ -5100,12 +5100,12 @@ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length) {
return TDB_CODE_SUCCESS;
}
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo) {
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo) {
SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDeleterParam *pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
if (NULL == pDeleterParam) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -5116,7 +5116,7 @@ int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pT
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < tbNum; ++i) {
STableKeyInfo *pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
}
......@@ -5130,7 +5130,6 @@ int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pT
return TSDB_CODE_SUCCESS;
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId;
......
......@@ -13,20 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "function.h"
#include "os.h"
#include "querynodes.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
static void extractTimeCondition(SJoinOperatorInfo *Info, SLogicConditionNode* pLogicConditionNode);
static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
......@@ -41,7 +41,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
......@@ -54,7 +54,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
} else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) {
extractTimeCondition(pInfo, (SLogicConditionNode*) pOnCondition);
extractTimeCondition(pInfo, (SLogicConditionNode*)pOnCondition);
}
pOperator->fpSet =
......@@ -66,7 +66,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
return pOperator;
_error:
_error:
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -180,10 +180,10 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
return (pRes->info.rows > 0) ? pRes : NULL;
}
static void extractTimeCondition(SJoinOperatorInfo *pInfo, SLogicConditionNode* pLogicConditionNode) {
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SLogicConditionNode* pLogicConditionNode) {
int32_t len = LIST_LENGTH(pLogicConditionNode->pParameterList);
for(int32_t i = 0; i < len; ++i) {
for (int32_t i = 0; i < len; ++i) {
SNode* pNode = nodesListGetNode(pLogicConditionNode->pParameterList, i);
if (nodeType(pNode) == QUERY_NODE_OPERATOR) {
SOperatorNode* pn1 = (SOperatorNode*)pNode;
......
......@@ -42,10 +42,11 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName);
static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock);
static bool processBlockWithProbability(const SSampleExecInfo *pInfo);
static void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock);
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
bool processBlockWithProbability(const SSampleExecInfo *pInfo) {
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
#if 0
if (pInfo->sampleRatio == 1) {
return true;
......@@ -261,7 +262,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, pBlock);
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr,
pBlock);
}
int64_t st = taosGetTimestampMs();
......@@ -295,7 +297,8 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow);
}
void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock) {
void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock) {
// currently only the tbname pseudo column
if (numOfPseudoExpr == 0) {
return;
......@@ -391,10 +394,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0);
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
if (groupId) {
pBlock->info.groupId = *groupId;
}else if(len != 0){
} else if (len != 0) {
pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len);
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t));
}
......@@ -483,7 +486,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to repeat descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
qDebug("%s start to repeat descending order scan data blocks due to query func required",
GET_TASKID(pTaskInfo));
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
......@@ -525,7 +529,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
taosArrayDestroy(pTableScanInfo->pGroupCols);
for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++) {
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i);
taosMemoryFree(key.pData);
}
......@@ -562,11 +566,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
}
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio= pTableScanNode->ratio;
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
......@@ -604,7 +608,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->cost.openCost = 0;
return pOperator;
_error:
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
......@@ -723,16 +727,19 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
}
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW;
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
}
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW;
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
}
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX,};
STimeWindow win = {
.skey = INT64_MIN,
.ekey = INT64_MAX,
};
bool needRead = false;
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, pInfo->primaryTsIndex);
......@@ -759,7 +766,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow;
int32_t size = taosArrayGetSize(pWins);
if (pInfo->scanWinIndex < size) {
win = *(STimeWindow *)taosArrayGet(pWins, pInfo->scanWinIndex);
win = *(STimeWindow*)taosArrayGet(pWins, pInfo->scanWinIndex);
pInfo->scanWinIndex++;
needRead = true;
} else {
......@@ -790,8 +797,8 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
return pResult;
}
static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible,
SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
SSDataBlock* pUpdateBlock) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
......@@ -859,8 +866,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
return pInfo->pUpdateRes;
} else {
if (isStateWindow(pInfo) &&
taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
prepareDataScan(pInfo);
......@@ -974,8 +980,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -992,7 +998,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
pInfo->pColMatchInfo =
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
......@@ -1025,8 +1032,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
}
if (isSmaStream(pTableScanNode->triggerType)) {
pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval,
pTableScanNode->filesFactor);
pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval, pTableScanNode->filesFactor);
}
if (pSTInfo->interval.interval > 0 && pDataReader) {
......@@ -1059,8 +1065,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pOperator->numOfExprs = pInfo->pRes->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL,
NULL, operatorDummyCloseFn, NULL, NULL, NULL);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator;
......@@ -1445,8 +1451,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen,
pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
// todo log the filter info
doFilterResult(pInfo);
......@@ -1519,7 +1525,8 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT
return numOfRows;
}
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -1567,7 +1574,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
return pOperator;
_error:
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
......@@ -1687,16 +1694,16 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val);
char *data = NULL;
if(pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL){
data = tTagValToData((const STagVal *)p, false);
}else {
char* data = NULL;
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
data = tTagValToData((const STagVal*)p, false);
} else {
data = (char*)p;
}
colDataAppend(pDst, count, data, (data == NULL));
if(pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL
&& IS_VAR_DATA_TYPE(((const STagVal *)p)->type) && data != NULL){
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
data != NULL) {
taosMemoryFree(data);
}
}
......@@ -1726,7 +1733,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -1743,7 +1751,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode);;
pInfo->pRes = createResDataBlock(pDescNode);
;
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
......
......@@ -1822,7 +1822,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pInfo->tsSlotId = tsSlotId;
pOperator->name = "StateWindowOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
......@@ -1874,7 +1874,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
......@@ -2137,7 +2137,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
pInfo->pChildren = NULL;
pOperator->name = "StreamSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
......@@ -2624,7 +2624,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
goto _error;
}
pOperator->name = "StreamFinalSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
int32_t numOfChild = 1; // Todo(liuyao) get it from phy plan
pInfo = pOperator->info;
pInfo->pChildren = taosArrayInit(8, sizeof(void*));
......@@ -3015,7 +3015,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pChildren = NULL;
pOperator->name = "StreamStateAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->numOfExprs = numOfCols;
......
......@@ -456,6 +456,49 @@ static SNode* physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) {
return (SNode*)pDst;
}
static SNode* physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, physiNodeCopy);
CLONE_NODE_LIST_FIELD(pScanCols);
CLONE_NODE_LIST_FIELD(pScanPseudoCols);
COPY_SCALAR_FIELD(uid);
COPY_SCALAR_FIELD(suid);
COPY_SCALAR_FIELD(tableType);
COPY_OBJECT_FIELD(tableName, sizeof(SName));
return (SNode*)pDst;
}
static SNode* physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) {
return physiScanCopy(pSrc, pDst);
}
static SNode* physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(scan, physiScanCopy);
COPY_OBJECT_FIELD(scanSeq[0], sizeof(uint8_t) * 2);
COPY_OBJECT_FIELD(scanRange, sizeof(STimeWindow));
COPY_SCALAR_FIELD(ratio);
COPY_SCALAR_FIELD(dataRequired);
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
CLONE_NODE_LIST_FIELD(pPartitionKeys);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
COPY_SCALAR_FIELD(intervalUnit);
COPY_SCALAR_FIELD(slidingUnit);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor);
return (SNode*)pDst;
}
static SNode* physiSysTableScanCopy(const SSystemTableScanPhysiNode* pSrc, SSystemTableScanPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(scan, physiScanCopy);
COPY_OBJECT_FIELD(mgmtEpSet, sizeof(SEpSet));
COPY_SCALAR_FIELD(showRewrite);
COPY_SCALAR_FIELD(accountId);
return (SNode*)pDst;
}
static SNode* physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, physiNodeCopy);
CLONE_NODE_LIST_FIELD(pExprs);
......@@ -603,6 +646,14 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicIndefRowsFuncCopy((const SIndefRowsFuncLogicNode*)pNode, (SIndefRowsFuncLogicNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return physiTagScanCopy((const STagScanPhysiNode*)pNode, (STagScanPhysiNode*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return physiTableScanCopy((const STableScanPhysiNode*)pNode, (STableScanPhysiNode*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
return physiSysTableScanCopy((const SSystemTableScanPhysiNode*)pNode, (SSystemTableScanPhysiNode*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
......
......@@ -220,9 +220,9 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiSystemTableScan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
return "PhysiJoin";
case QUERY_NODE_PHYSICAL_PLAN_AGG:
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
return "PhysiAgg";
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return "PhysiExchange";
......@@ -242,13 +242,13 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiStreamSemiInterval";
case QUERY_NODE_PHYSICAL_PLAN_FILL:
return "PhysiFill";
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
return "PhysiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return "PhysiStreamSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return "PhysiStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return "PhysiStreamStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return "PhysiPartition";
......@@ -3875,9 +3875,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiSysTableScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return physiProjectNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
return physiJoinNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_AGG:
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
return physiAggNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return physiExchangeNodeToJson(pObj, pJson);
......@@ -3893,11 +3893,11 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiIntervalNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_FILL:
return physiFillNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return physiSessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return physiStateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return physiPartitionNodeToJson(pObj, pJson);
......@@ -4008,9 +4008,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiSysTableScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return jsonToPhysiProjectNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
return jsonToPhysiJoinNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_AGG:
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return jsonToPhysiExchangeNode(pJson, pObj);
......@@ -4026,11 +4026,11 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiIntervalNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_FILL:
return jsonToPhysiFillNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return jsonToPhysiSessionWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return jsonToPhysiStateWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj);
......
......@@ -467,7 +467,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_JOIN: {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
......@@ -478,7 +478,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_AGG: {
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode* pAgg = (SAggPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
......@@ -518,12 +518,12 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: {
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)pNode;
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
......
......@@ -255,9 +255,9 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SSystemTableScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
return makeNode(type, sizeof(SJoinPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_AGG:
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
return makeNode(type, sizeof(SAggPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangePhysiNode));
......@@ -277,13 +277,13 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SStreamSemiIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_FILL:
return makeNode(type, sizeof(SFillPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
return makeNode(type, sizeof(SSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return makeNode(type, sizeof(SStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return makeNode(type, sizeof(SStreamStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionPhysiNode));
......@@ -657,14 +657,14 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyList(pPhyNode->pProjections);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_JOIN: {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pOnConditions);
nodesDestroyList(pPhyNode->pTargets);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_AGG: {
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode* pPhyNode = (SAggPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pExprs);
......@@ -689,8 +689,8 @@ void nodesDestroyNode(SNodeptr pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
......
......@@ -556,7 +556,7 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
SPhysiNode** pPhyNode) {
SJoinPhysiNode* pJoin =
(SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_JOIN);
(SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -738,7 +738,8 @@ static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeLi
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
SPhysiNode** pPhyNode) {
SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG);
SAggPhysiNode* pAgg =
(SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
if (NULL == pAgg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -996,8 +997,7 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
: QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW));
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION : QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION));
if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -1009,10 +1009,9 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SStateWinodwPhysiNode* pState =
(SStateWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
: QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW));
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE : QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE));
if (NULL == pState) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -340,7 +340,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
return code;
}
static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
......@@ -363,7 +363,7 @@ static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitIn
return code;
}
static int32_t stbSplSplitWindowNodeForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
......@@ -379,14 +379,30 @@ static int32_t stbSplSplitWindowNodeForStream(SSplitContext* pCxt, SStableSplitI
return code;
}
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitWindowNodeForStream(pCxt, pInfo);
return stbSplSplitIntervalForStream(pCxt, pInfo);
} else {
return stbSplSplitWindowNodeForBatch(pCxt, pInfo);
return stbSplSplitIntervalForBatch(pCxt, pInfo);
}
}
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
case WINDOW_TYPE_INTERVAL:
return stbSplSplitInterval(pCxt, pInfo);
case WINDOW_TYPE_SESSION:
return stbSplSplitSession(pCxt, pInfo);
default:
break;
}
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
SNodeList* pFunc = pMergeAgg->pAggFuncs;
pMergeAgg->pAggFuncs = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册