提交 8992721e 编写于 作者: L liuyao

refact child cache

上级 e42caca9
......@@ -579,6 +579,7 @@ typedef struct SStreamIntervalOperatorInfo {
SSDataBlock* pPullDataRes;
bool isFinal;
SArray* pChildren;
int32_t numOfChild;
SStreamState* pState;
SWinKey delKey;
uint64_t numOfDatapack;
......
......@@ -1541,14 +1541,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
streamFileStateDestroy(pInfo->pState->pFileState);
taosMemoryFreeClear(pInfo->pState);
if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
destroyOperatorInfo(pChildOp);
}
taosArrayDestroy(pInfo->pChildren);
}
nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows);
......@@ -2081,58 +2073,6 @@ int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos
return TSDB_CODE_SUCCESS;
}
static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t size = taosArrayGetSize(pWinArray);
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
SExprSupp* pSup = &pOperator->exprSupp;
if (!pInfo->pChildren) {
return;
}
for (int32_t i = 0; i < size; i++) {
SWinKey* pWinRes = taosArrayGet(pWinArray, i);
SRowBuffPos* pCurResPos = NULL;
SResultRow* pCurResult = NULL;
STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval);
if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup)) {
continue;
}
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
int32_t num = 0;
for (int32_t j = 0; j < numOfChildren; j++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j);
SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp;
if (!hasIntervalWindow(pChInfo->pState, pWinRes)) {
continue;
}
if (num == 0) {
int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResPos, pWinRes->groupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup);
ASSERT(pCurResPos != NULL);
pCurResult = (SResultRow*)pCurResPos->pRowBuff;
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
}
num++;
SRowBuffPos* pChResPos = NULL;
SResultRow* pChResult = NULL;
setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResPos, pWinRes->groupId, pChildSup->pCtx,
pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup);
pChResult = (SResultRow*)pChResPos->pRowBuff;
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
}
if (num > 0 && pUpdatedMap) {
saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pCurResPos, pUpdatedMap);
saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize);
}
}
}
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
......@@ -2250,9 +2190,8 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
int32_t size1 = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, winKey, size1);
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
addPullWindow(pInfo->pPullDataMap, winKey, pInfo->numOfChild);
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, pInfo->numOfChild);
}
}
}
......@@ -2413,7 +2352,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
continue;
}
if (IS_FINAL_OP(pInfo) && pInfo->pChildren) {
if (IS_FINAL_OP(pInfo) && pInfo->numOfChild > 0) {
bool ignore = true;
SWinKey winRes = {
.ts = nextWin.skey,
......@@ -2425,8 +2364,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, &winRes, size);
addPullWindow(pInfo->pPullDataMap, &winRes, pInfo->numOfChild);
}
} else {
int32_t index = -1;
......@@ -2780,24 +2718,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
streamStateSetNumber(pInfo->pState, -1);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pChildren = NULL;
if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
if (!pInfo->pChildren) {
goto _error;
}
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
if (pChildOp) {
SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
taosArrayPush(pInfo->pChildren, &pChildOp);
streamStateSetNumber(pChInfo->pState, i);
continue;
}
goto _error;
}
}
pInfo->numOfChild = numOfChild;
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
......@@ -3337,13 +3258,13 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
int32_t numOfOutput = pSup->numOfExprs;
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
int32_t numOfChild = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SSessionKey* pWinKey = taosArrayGet(pWinArray, i);
int32_t num = 0;
SResultWindowInfo parentWin = {0};
for (int32_t j = 0; j < numOfChildren; j++) {
for (int32_t j = 0; j < numOfChild; j++) {
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup;
......@@ -4995,7 +4916,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pullIndex = 0;
pInfo->pPullDataRes = NULL;
pInfo->isFinal = false;
pInfo->pChildren = NULL;
pInfo->numOfChild = 0;
pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0;
pInfo->numOfDatapack = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册