提交 61153b76 编写于 作者: 5 54liuyao

feat(stream): distribute stream session plan

上级 21e9ef32
...@@ -227,6 +227,7 @@ typedef enum ENodeType { ...@@ -227,6 +227,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_FILL, QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION,
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE,
......
...@@ -130,13 +130,17 @@ typedef struct SMergeLogicNode { ...@@ -130,13 +130,17 @@ typedef struct SMergeLogicNode {
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType; typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef enum EIntervalAlgorithm { typedef enum EWindowAlgorithm {
INTERVAL_ALGO_HASH = 1, INTERVAL_ALGO_HASH = 1,
INTERVAL_ALGO_MERGE, INTERVAL_ALGO_MERGE,
INTERVAL_ALGO_STREAM_FINAL, INTERVAL_ALGO_STREAM_FINAL,
INTERVAL_ALGO_STREAM_SEMI, INTERVAL_ALGO_STREAM_SEMI,
INTERVAL_ALGO_STREAM_SINGLE, INTERVAL_ALGO_STREAM_SINGLE,
} EIntervalAlgorithm; SESSION_ALGO_STREAM_SEMI,
SESSION_ALGO_STREAM_FINAL,
SESSION_ALGO_STREAM_SINGLE,
SESSION_ALGO_MERGE,
} EWindowAlgorithm;
typedef struct SWindowLogicNode { typedef struct SWindowLogicNode {
SLogicNode node; SLogicNode node;
...@@ -153,7 +157,7 @@ typedef struct SWindowLogicNode { ...@@ -153,7 +157,7 @@ typedef struct SWindowLogicNode {
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
double filesFactor; double filesFactor;
EIntervalAlgorithm intervalAlgo; EWindowAlgorithm windowAlgo;
} SWindowLogicNode; } SWindowLogicNode;
typedef struct SFillLogicNode { typedef struct SFillLogicNode {
...@@ -371,6 +375,8 @@ typedef struct SSessionWinodwPhysiNode { ...@@ -371,6 +375,8 @@ typedef struct SSessionWinodwPhysiNode {
} SSessionWinodwPhysiNode; } SSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamSessionWinodwPhysiNode; typedef SSessionWinodwPhysiNode SStreamSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamSemiSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamFinalSessionWinodwPhysiNode;
typedef struct SStateWinodwPhysiNode { typedef struct SStateWinodwPhysiNode {
SWinodwPhysiNode window; SWinodwPhysiNode window;
......
...@@ -845,8 +845,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -845,8 +845,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo); SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
......
...@@ -4728,18 +4728,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4728,18 +4728,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, int32_t children = 0;
.calTrigger = pSessionNode->window.triggerType}; pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); int32_t children = 1;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
......
...@@ -2425,12 +2425,15 @@ int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, Sql ...@@ -2425,12 +2425,15 @@ int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, Sql
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo)); return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo));
} }
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { int32_t numOfCols = 0;
int32_t code = TSDB_CODE_OUT_OF_MEMORY; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo)); SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
...@@ -2453,12 +2456,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx ...@@ -2453,12 +2456,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
} }
initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols); initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols);
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = (STimeWindowAggSupp) {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType,
.maxTs = INT64_MIN};
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = tsSlotId; pInfo->primaryTsIndex = tsSlotId;
pInfo->gap = gap; pInfo->gap = pSessionNode->gap;
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
...@@ -2960,25 +2965,20 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2960,25 +2965,20 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SExecTaskInfo* pTaskInfo) { SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo);
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo = NULL;
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pExprInfo, numOfCols, pResBlock, gap,
tsSlotId, pTwAggSupp, pTaskInfo);
if (pOperator == NULL) { if (pOperator == NULL) {
goto _error; goto _error;
} }
pOperator->name = "StreamFinalSessionWindowAggOperator"; pOperator->name = "StreamFinalSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
int32_t numOfChild = 1; // Todo(liuyao) get it from phy plan SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo = pOperator->info;
pInfo->pChildren = taosArrayInit(8, sizeof(void*)); pInfo->pChildren = taosArrayInit(8, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) { for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChild = SOperatorInfo* pChild =
createStreamSessionAggOperatorInfo(NULL, pExprInfo, numOfCols, NULL, gap, tsSlotId, pTwAggSupp, pTaskInfo); createStreamSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo);
if (pChild == NULL) { if (pChild == NULL) {
goto _error; goto _error;
} }
...@@ -2988,7 +2988,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -2988,7 +2988,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo, numOfCols); destroyStreamSessionAggOperatorInfo(pInfo, pOperator->numOfExprs);
} }
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
......
...@@ -427,7 +427,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD ...@@ -427,7 +427,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(filesFactor); COPY_SCALAR_FIELD(filesFactor);
COPY_SCALAR_FIELD(intervalAlgo); COPY_SCALAR_FIELD(windowAlgo);
return (SNode*)pDst; return (SNode*)pDst;
} }
...@@ -538,6 +538,12 @@ static SNode* physiIntervalCopy(const SIntervalPhysiNode* pSrc, SIntervalPhysiNo ...@@ -538,6 +538,12 @@ static SNode* physiIntervalCopy(const SIntervalPhysiNode* pSrc, SIntervalPhysiNo
return (SNode*)pDst; return (SNode*)pDst;
} }
static SNode* physiSessionCopy(const SSessionWinodwPhysiNode* pSrc, SSessionWinodwPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(window, physiWindowCopy);
COPY_SCALAR_FIELD(gap);
return (SNode*)pDst;
}
static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) { static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) {
COPY_SCALAR_FIELD(dataBlockId); COPY_SCALAR_FIELD(dataBlockId);
CLONE_NODE_LIST_FIELD(pSlots); CLONE_NODE_LIST_FIELD(pSlots);
...@@ -678,6 +684,9 @@ SNode* nodesCloneNode(const SNode* pNode) { ...@@ -678,6 +684,9 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return physiIntervalCopy((const SIntervalPhysiNode*)pNode, (SIntervalPhysiNode*)pDst); return physiIntervalCopy((const SIntervalPhysiNode*)pNode, (SIntervalPhysiNode*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return physiSessionCopy((const SSessionWinodwPhysiNode*)pNode, (SSessionWinodwPhysiNode*)pDst);
default: default:
break; break;
} }
......
...@@ -246,6 +246,10 @@ const char* nodesNodeName(ENodeType type) { ...@@ -246,6 +246,10 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiSessionWindow"; return "PhysiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return "PhysiStreamSessionWindow"; return "PhysiStreamSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
return "PhysiStreamSemiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return "PhysiStreamFinalSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return "PhysiStateWindow"; return "PhysiStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
...@@ -3998,6 +4002,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -3998,6 +4002,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiFillNodeToJson(pObj, pJson); return physiFillNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return physiSessionWindowNodeToJson(pObj, pJson); return physiSessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
...@@ -4131,6 +4137,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { ...@@ -4131,6 +4137,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiFillNode(pJson, pObj); return jsonToPhysiFillNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return jsonToPhysiSessionWindowNode(pJson, pObj); return jsonToPhysiSessionWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
......
...@@ -287,6 +287,10 @@ SNode* nodesMakeNode(ENodeType type) { ...@@ -287,6 +287,10 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SSessionWinodwPhysiNode)); return makeNode(type, sizeof(SSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode)); return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
return makeNode(type, sizeof(SStreamSemiSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return makeNode(type, sizeof(SStreamFinalSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return makeNode(type, sizeof(SStateWinodwPhysiNode)); return makeNode(type, sizeof(SStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
...@@ -804,6 +808,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -804,6 +808,7 @@ void nodesDestroyNode(SNode* pNode) {
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break; break;
......
...@@ -548,6 +548,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW ...@@ -548,6 +548,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW
pWindow->winType = WINDOW_TYPE_SESSION; pWindow->winType = WINDOW_TYPE_SESSION;
pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i; pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i;
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE;
pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol); pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol);
if (NULL == pWindow->pTspk) { if (NULL == pWindow->pTspk) {
...@@ -572,7 +573,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva ...@@ -572,7 +573,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval); pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
pWindow->slidingUnit = pWindow->slidingUnit =
(NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit); (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
pWindow->intervalAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH; pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH;
pWindow->pTspk = nodesCloneNode(pInterval->pCol); pWindow->pTspk = nodesCloneNode(pInterval->pCol);
if (NULL == pWindow->pTspk) { if (NULL == pWindow->pTspk) {
......
...@@ -977,8 +977,8 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* ...@@ -977,8 +977,8 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
return code; return code;
} }
static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) { static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
switch (intervalAlgo) { switch (windowAlgo) {
case INTERVAL_ALGO_HASH: case INTERVAL_ALGO_HASH:
return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
case INTERVAL_ALGO_MERGE: case INTERVAL_ALGO_MERGE:
...@@ -989,6 +989,14 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) { ...@@ -989,6 +989,14 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL; return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
case INTERVAL_ALGO_STREAM_SINGLE: case INTERVAL_ALGO_STREAM_SINGLE:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
case SESSION_ALGO_STREAM_FINAL:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
case SESSION_ALGO_STREAM_SEMI:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION;
case SESSION_ALGO_STREAM_SINGLE:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
case SESSION_ALGO_MERGE:
return QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
default: default:
break; break;
} }
...@@ -998,7 +1006,7 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) { ...@@ -998,7 +1006,7 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode( SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->intervalAlgo)); pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
if (NULL == pInterval) { if (NULL == pInterval) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -1015,8 +1023,7 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil ...@@ -1015,8 +1023,7 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode( SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode, pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION : QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION));
if (NULL == pSession) { if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -376,8 +376,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo ...@@ -376,8 +376,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
SLogicNode* pPartWindow = NULL; SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH; ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE; ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE;
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -400,8 +400,8 @@ static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInf ...@@ -400,8 +400,8 @@ static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInf
SLogicNode* pPartWindow = NULL; SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_STREAM_SEMI; ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_STREAM_FINAL; ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -421,8 +421,29 @@ static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) ...@@ -421,8 +421,29 @@ static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo)
} }
} }
static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = SESSION_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = SESSION_ALGO_STREAM_FINAL;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId);
return code;
}
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
return TSDB_CODE_PLAN_INTERNAL_ERROR; if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitSessionForStream(pCxt, pInfo);
} else {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
} }
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册