提交 3043c7f0 编写于 作者: L liuyao

refactor operator

上级 57a020ed
...@@ -456,7 +456,6 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -456,7 +456,6 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pPullWins; // SPullWindowInfo SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex; int32_t pullIndex;
SSDataBlock* pPullDataRes; SSDataBlock* pPullDataRes;
bool isFinal;
SArray* pChildren; SArray* pChildren;
int32_t numOfChild; int32_t numOfChild;
SStreamState* pState; // void SStreamState* pState; // void
...@@ -507,7 +506,6 @@ typedef struct SStreamSessionAggOperatorInfo { ...@@ -507,7 +506,6 @@ typedef struct SStreamSessionAggOperatorInfo {
void* pDelIterator; void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved; bool ignoreExpiredDataSaved;
SArray* pUpdated; SArray* pUpdated;
......
...@@ -26,7 +26,8 @@ ...@@ -26,7 +26,8 @@
#include "tlog.h" #include "tlog.h"
#include "ttime.h" #include "ttime.h"
#define IS_FINAL_OP(op) ((op)->isFinal) #define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" #define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState"
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" #define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
...@@ -231,7 +232,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa ...@@ -231,7 +232,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = {0}; STimeWindow win = {0};
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_INTERVAL_OP(pOperator)) {
win.skey = startTsCols[i]; win.skey = startTsCols[i];
win.ekey = endTsCols[i]; win.ekey = endTsCols[i];
} else { } else {
...@@ -746,14 +747,14 @@ static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int6 ...@@ -746,14 +747,14 @@ static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int6
pTaskInfo->streamInfo.checkPointId = ckId; pTaskInfo->streamInfo.checkPointId = ckId;
} }
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
SSHashObj* pUpdatedMap) { SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
int32_t numOfOutput = pSup->numOfExprs; int32_t numOfOutput = pSup->numOfExprs;
int32_t step = 1; int32_t step = 1;
TSKEY* tsCols = NULL; TSKEY* tsCols = NULL;
...@@ -767,14 +768,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -767,14 +768,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
int32_t startPos = 0; int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = {0}; STimeWindow nextWin = {0};
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_INTERVAL_OP(pOperator)) {
nextWin = getFinalTimeWindow(ts, &pInfo->interval); nextWin = getFinalTimeWindow(ts, &pInfo->interval);
} else { } else {
nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
} }
while (1) { while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_OP(pInfo)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_INTERVAL_OP(pOperator)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) { if (startPos < 0) {
break; break;
...@@ -782,7 +783,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -782,7 +783,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
continue; continue;
} }
if (IS_FINAL_OP(pInfo) && pInfo->numOfChild > 0) { if (IS_FINAL_INTERVAL_OP(pOperator) && pInfo->numOfChild > 0) {
bool ignore = true; bool ignore = true;
SWinKey winRes = { SWinKey winRes = {
.ts = nextWin.skey, .ts = nextWin.skey,
...@@ -825,7 +826,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -825,7 +826,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
} }
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_INTERVAL_OP(pOperator)) {
forwardRows = 1; forwardRows = 1;
} else { } else {
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
...@@ -866,7 +867,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -866,7 +867,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
} }
} }
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_INTERVAL_OP(pOperator)) {
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos);
} else { } else {
startPos = startPos =
...@@ -910,7 +911,7 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) { ...@@ -910,7 +911,7 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) {
static char* getStreamOpName(uint16_t opType) { static char* getStreamOpName(uint16_t opType) {
switch (opType) { switch (opType) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return "single interval"; return "interval single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
return "interval final"; return "interval final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
...@@ -921,16 +922,18 @@ static char* getStreamOpName(uint16_t opType) { ...@@ -921,16 +922,18 @@ static char* getStreamOpName(uint16_t opType) {
} }
static SSDataBlock* getResult(SOperatorInfo* pOperator){ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator){
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
uint16_t opType = pOperator->operatorType; uint16_t opType = pOperator->operatorType;
if (IS_FINAL_INTERVAL_OP(pOperator)) {
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) { if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pPullDataRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pPullDataRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
return pInfo->pPullDataRes; return pInfo->pPullDataRes;
} }
}
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
...@@ -941,7 +944,7 @@ static SSDataBlock* getResult(SOperatorInfo* pOperator){ ...@@ -941,7 +944,7 @@ static SSDataBlock* getResult(SOperatorInfo* pOperator){
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
return NULL; return NULL;
...@@ -955,12 +958,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -955,12 +958,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) { } else if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* resBlock = getResult(pOperator); SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) { if (resBlock != NULL) {
return resBlock; return resBlock;
} }
...@@ -971,12 +974,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -971,12 +974,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_INTERVAL_OP(pOperator)) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo); clearStreamIntervalOperator(pInfo);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
qDebug("===stream===clear semi operator"); qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType));
} else { } else {
if (pInfo->twAggSup.maxTs > 0 && if (pInfo->twAggSup.maxTs > 0 &&
pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
...@@ -984,15 +987,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -984,15 +987,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
} }
qDebug("===stream===interval final close"); qDebug("stask:%s ===stream===%s close", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType));
} }
return NULL; return NULL;
} else { } else {
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_INTERVAL_OP(pOperator)) {
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes; return pInfo->pDelRes;
} }
} }
...@@ -1024,12 +1027,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1024,12 +1027,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) { if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack); IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", pInfo->numOfDatapack);
pInfo->numOfDatapack = 0; pInfo->numOfDatapack = 0;
break; break;
} }
pInfo->numOfDatapack++; pInfo->numOfDatapack++;
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv", GET_TASKID(pTaskInfo)); printDataBlock(pBlock, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final recv" : "interval semi recv", GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;
...@@ -1037,7 +1040,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1037,7 +1040,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_INTERVAL_OP(pOperator)) {
int32_t chId = getChildIndex(pBlock); int32_t chId = getChildIndex(pBlock);
addRetriveWindow(delWins, pInfo, chId); addRetriveWindow(delWins, pInfo, chId);
if (pBlock->info.type != STREAM_CLEAR) { if (pBlock->info.type != STREAM_CLEAR) {
...@@ -1053,7 +1056,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1053,7 +1056,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pDelRes, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_CLEAR) { if (pBlock->info.type == STREAM_CLEAR) {
pInfo->pDelRes->info.type = STREAM_CLEAR; pInfo->pDelRes->info.type = STREAM_CLEAR;
} else { } else {
...@@ -1063,17 +1066,17 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1063,17 +1066,17 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
break; break;
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_INTERVAL_OP(pOperator)) {
pInfo->recvGetAll = true; pInfo->recvGetAll = true;
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_INTERVAL_OP(pOperator)) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
if (taosArrayGetSize(pInfo->pUpdated) > 0) { if (taosArrayGetSize(pInfo->pUpdated) > 0) {
break; break;
} }
continue; continue;
} else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_INTERVAL_OP(pOperator)) {
processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, pInfo->numOfChild, pOperator); processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, pInfo->numOfChild, pOperator);
continue; continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
...@@ -1094,7 +1097,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1094,7 +1097,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_INTERVAL_OP(pOperator)) {
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
} }
...@@ -1114,7 +1117,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1114,7 +1117,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
return getResult(pOperator); return buildIntervalResult(pOperator);
} }
static int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) { static int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
...@@ -1253,15 +1256,13 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -1253,15 +1256,13 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
pInfo->isFinal = true;
pOperator->name = "StreamFinalIntervalOperator"; pOperator->name = "StreamFinalIntervalOperator";
} else { } else {
// semi interval operator does not catch result // semi interval operator does not catch result
pInfo->isFinal = false;
pOperator->name = "StreamSemiIntervalOperator"; pOperator->name = "StreamSemiIntervalOperator";
} }
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) { if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
} }
...@@ -2004,12 +2005,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2004,12 +2005,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) { } else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo));
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) { if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo));
return pBInfo->pRes; return pBInfo->pRes;
} }
...@@ -2030,7 +2031,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2030,7 +2031,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv", GET_TASKID(pTaskInfo)); printDataBlock(pBlock, IS_FINAL_SESSION_OP(pOperator) ? "final session recv" : "single session recv", GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {
...@@ -2038,7 +2039,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2038,7 +2039,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// gap must be 0 // gap must be 0
doDeleteTimeWindows(pAggSup, pBlock, pWins); doDeleteTimeWindows(pAggSup, pBlock, pWins);
removeSessionResults(pInfo->pStUpdated, pWins); removeSessionResults(pInfo->pStUpdated, pWins);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_SESSION_OP(pOperator)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
...@@ -2064,8 +2065,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2064,8 +2065,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo), true); doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_SESSION_OP(pOperator), true);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_SESSION_OP(pOperator)) {
int32_t chIndex = getChildIndex(pBlock); int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
// if chIndex + 1 - size > 0, add new child // if chIndex + 1 - size > 0, add new child
...@@ -2108,13 +2109,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2108,13 +2109,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo));
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) { if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo));
return pBInfo->pRes; return pBInfo->pRes;
} }
...@@ -2240,7 +2241,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -2240,7 +2241,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pDelIterator = NULL; pInfo->pDelIterator = NULL;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode; pInfo->pPhyNode = pPhyNode;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired; pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false; pInfo->ignoreExpiredDataSaved = false;
...@@ -2408,9 +2408,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -2408,9 +2408,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pOperator->operatorType = pPhyNode->type;
pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION); char* name = getStreamOpName(pOperator->operatorType);
char* name = (pInfo->isFinal) ? "StreamSessionFinalAggOperator" : "StreamSessionSemiAggOperator";
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
...@@ -2421,7 +2420,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -2421,7 +2420,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = pPhyNode->type;
if (numOfChild > 0) { if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) { for (int32_t i = 0; i < numOfChild; i++) {
...@@ -2436,7 +2434,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -2436,7 +2434,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
} }
} }
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) { if (!IS_FINAL_SESSION_OP(pOperator) || numOfChild == 0) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
} }
...@@ -3153,7 +3151,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -3153,7 +3151,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false; pInfo->ignoreExpiredDataSaved = false;
pInfo->isFinal = false;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
...@@ -3194,7 +3191,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -3194,7 +3191,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pPullWins = NULL; // SPullWindowInfo pInfo->pPullWins = NULL; // SPullWindowInfo
pInfo->pullIndex = 0; pInfo->pullIndex = 0;
pInfo->pPullDataRes = NULL; pInfo->pPullDataRes = NULL;
pInfo->isFinal = false;
pInfo->numOfChild = 0; pInfo->numOfChild = 0;
pInfo->delKey.ts = INT64_MAX; pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0; pInfo->delKey.groupId = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册