提交 270771e3 编写于 作者: 5 54liuyao

feat(stream): stream final session operator

上级 109df430
......@@ -110,3 +110,4 @@ contrib/*
!contrib/CMakeLists.txt
!contrib/test
sql
debug*/
......@@ -214,6 +214,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
......
......@@ -465,6 +465,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
SAggSupporter aggSup; // aggregate supporter
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SArray* pChildren;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
......@@ -581,6 +582,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SSDataBlock* pDelRes;
SHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
......@@ -722,7 +724,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
......@@ -797,7 +799,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
const char* pDir);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
......
......@@ -5162,7 +5162,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
const char* pDir) {
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
......
......@@ -833,15 +833,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
size_t total = taosArrayGetSize(pInfo->pBlockLists);
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
SSDataBlock* pDB = getDataFromCatch(pInfo);
if (pDB != NULL) {
return pDB;
} else {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
}
}
if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo);
pOperator->status = OP_EXEC_DONE;
......@@ -849,17 +840,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
if (pBlock->info.type == STREAM_REPROCESS) {
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else {
int32_t code = catchDatablock(pBlock, &pInfo->childAggSup, pInfo->primaryTsIndex, 0);
if (code != TDB_CODE_SUCCESS) {
pTaskInfo->code = code;
longjmp(pTaskInfo->env, code);
}
}
return pBlock;
return taosArrayGetP(pInfo->pBlockLists, current);
} else {
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
blockDataDestroy(pInfo->pUpdateRes);
......@@ -1023,7 +1004,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan
initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
......@@ -1067,7 +1067,8 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
}
static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock) {
SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock,
SArray* pUpWins) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
int32_t step = 0;
......@@ -1079,6 +1080,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
doClearWindow(pSup, pBinfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
if (pUpWins) {
taosArrayPush(pUpWins, &win);
}
}
}
......@@ -1119,7 +1123,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0,
pOperator->numOfExprs, pBlock);
pOperator->numOfExprs, pBlock, NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue;
}
......@@ -1154,6 +1158,15 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo *)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupAggSup(&pInfo->aggSup);
if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
destroyIntervalOperatorInfo(pChildOp->info, numOfOutput);
taosMemoryFreeClear(pChildOp->info);
taosMemoryFreeClear(pChildOp);
}
}
}
bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
......@@ -1228,32 +1241,38 @@ _error:
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo) {
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = *pInterval;
pInfo->twAggSup = *pTwAggSupp;
pInfo->primaryTsIndex = primaryTsSlotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock,
keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
int32_t numOfChild = 8;// Todo(liuyao) get it from phy plan
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo));
for (int32_t i = 0; i < numOfChild; i++) {
SSDataBlock* chRes = createOneDataBlock(pResBlock, false);
SOperatorInfo* pChildOp = createIntervalOperatorInfo(NULL, pExprInfo, numOfCols,
chRes, pInterval, primaryTsSlotId, pTwAggSupp, NULL, pTaskInfo);
if (pChildOp && chRes) {
taosArrayPush(pInfo->pChildren, &pChildOp);
continue;
}
goto _error;
}
pOperator->name = "StreamFinalIntervalOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
......@@ -1703,6 +1722,51 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
return pUpdated;
}
bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) {
return pInfo->pChildren != NULL;
}
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
for (int32_t k = 0; k < numOfOutput; ++k) {
if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
continue;
}
int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
longjmp(pTaskInfo->env, code);
}
}
}
}
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArray *pWinArray,
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
int32_t size = taosArrayGetSize(pWinArray);
ASSERT(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
STimeWindow* pParentWin = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL;
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0,
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup,
pTaskInfo);
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
for (int32_t j = 0; j < numOfChildren; j++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j);
SIntervalAggOperatorInfo* pChInfo = pChildOp->info;
SResultRow* pChResult = NULL;
setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult,
0, pChInfo->binfo.pCtx, pChildOp->numOfExprs, pChInfo->binfo.rowCellInfoOffset,
&pChInfo->aggSup, pTaskInfo);
compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo);
}
}
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0];
......@@ -1726,10 +1790,26 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
if (pBlock->info.type == STREAM_REPROCESS) {
SArray *pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock, pUpWins);
if (isFinalInterval(pInfo)) {
int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SIntervalAggOperatorInfo* pChildInfo = pChildOp->info;
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval,
pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL);
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId,
pOperator->numOfExprs, pOperator->pTaskInfo);
}
taosArrayDestroy(pUpWins);
continue;
}
if (isFinalInterval(pInfo)) {
int32_t chIndex = 1; //Todo(liuyao) get it from SSDataBlock
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
doStreamIntervalAgg(pChildOp);
}
pUpdated = doHashInterval(pOperator, pBlock, 0);
}
......@@ -1752,6 +1832,16 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo *pChild = taosArrayGetP(pInfo->pChildren, i);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput);
taosMemoryFreeClear(pChild);
taosMemoryFreeClear(pChInfo);
}
}
}
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
......@@ -1780,6 +1870,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamSessionAggOperatorInfo* pI
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo =
taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -1789,7 +1880,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
initResultSizeInfo(pOperator, 4096);
int32_t code = initStreamAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo");
code = initStreamAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo");
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -1820,6 +1911,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
pInfo->pDelIterator = NULL;
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
blockDataEnsureCapacity(pInfo->pDelRes, 64);
pInfo->pChildren = NULL;
pOperator->name = "StreamSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW;
......@@ -2068,24 +2160,6 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap)
return size - startIndex - 1;
}
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
for (int32_t k = 0; k < numOfOutput; ++k) {
if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
continue;
}
int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
longjmp(pTaskInfo->env, code);
}
}
}
}
void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num,
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) {
SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pResultRows, startIndex);
......@@ -2164,7 +2238,7 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator,
}
static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* pBinfo,
SSDataBlock* pBlock, int32_t tsIndex, int32_t numOfOutput, int64_t gap) {
SSDataBlock* pBlock, int32_t tsIndex, int32_t numOfOutput, int64_t gap, SArray* result) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
int32_t step = 0;
......@@ -2173,7 +2247,11 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo*
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex);
step = updateSessionWindowInfo(pCurWin, tsCols, pBlock->info.rows, i, gap, NULL);
ASSERT(isInWindow(pCurWin, tsCols[i], gap));
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pBinfo, numOfOutput);
if (result) {
taosArrayPush(result, pCurWin);
}
}
}
......@@ -2215,6 +2293,42 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
}
}
static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray *pWinArray,
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
int32_t size = taosArrayGetSize(pWinArray);
ASSERT(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SResultWindowInfo* pParentWin = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL;
setWindowOutputBuf(pParentWin, &pCurResult, pInfo->binfo.pCtx, groupId,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo);
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
for (int32_t j = 0; j < numOfChildren; j++) {
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
SArray* pChWins = pChInfo->streamAggSup.pResultRows;
int32_t chWinSize = taosArrayGetSize(pChWins);
int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey,
TSDB_ORDER_DESC, getSessionWindowEndkey);
for (int32_t k = index; k > 0 && k < chWinSize; k++) {
SResultWindowInfo* pcw = taosArrayGet(pChWins, k);
if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) {
SResultRow* pChResult = NULL;
setWindowOutputBuf(pcw, &pChResult, pChInfo->binfo.pCtx, groupId,
numOfOutput, pChInfo->binfo.rowCellInfoOffset, &pChInfo->streamAggSup, pTaskInfo);
compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo);
continue;
}
break;
}
}
}
}
bool isFinalSession(SStreamSessionAggOperatorInfo* pInfo) {
return pInfo->pChildren != NULL;
}
static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -2247,10 +2361,25 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
if (pBlock->info.type == STREAM_REPROCESS) {
SArray *pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, &pInfo->binfo, pBlock, 0,
pOperator->numOfExprs, pInfo->gap);
pOperator->numOfExprs, pInfo->gap, pWins);
if (isFinalSession(pInfo)) {
int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0,
pChildOp->numOfExprs, pChildInfo->gap, NULL);
rebuildTimeWindow(pInfo, pWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo);
}
taosArrayDestroy(pWins);
continue;
}
if (isFinalSession(pInfo)) {
int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock
SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
doStreamSessionWindowAggImpl(pOperator, pBlock, NULL, NULL);
}
doStreamSessionWindowAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted);
}
......@@ -2271,3 +2400,39 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
pInfo->streamAggSup.pResultBuf);
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo = NULL;
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pExprInfo,
numOfCols, pResBlock, gap, tsSlotId, pTwAggSupp, pTaskInfo);
if (pOperator == NULL) {
goto _error;
}
pOperator->name = "StreamFinalSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW;
int32_t numOfChild = 1; //Todo(liuyao) get it from phy plan
pInfo = pOperator->info;
pInfo->pChildren = taosArrayInit(8, sizeof(void *));
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChild = createStreamSessionAggOperatorInfo(NULL, pExprInfo,
numOfCols, NULL, gap, tsSlotId, pTwAggSupp, pTaskInfo);
if (pChild == NULL) {
goto _error;
}
taosArrayPush(pInfo->pChildren, &pChild);
}
return pOperator;
_error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo, numOfCols);
}
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册