未验证 提交 c30c53c3 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22120 from taosdata/fix/TD-25268

remove redundant results
......@@ -2914,6 +2914,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pWinBlock);
blockDataDestroy(pInfo->pUpdateRes);
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted);
taosArrayDestroy(pInfo->historyWins);
......@@ -3008,14 +3009,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
}
if (pHandle) {
pSup->winRange = pHandle->winRange;
// temporary
if (pSup->winRange.ekey <= 0) {
pSup->winRange.ekey = INT64_MAX;
}
}
pSup->pSessionAPI = pApi;
return TSDB_CODE_SUCCESS;
......@@ -3063,11 +3056,12 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->sessionWin.win)) {
code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->pOutputBuf, &pAggSup->pSessionAPI->stateStore);
pCurWin->pOutputBuf = taosMemoryMalloc(size);
pCurWin->pOutputBuf = taosMemoryCalloc(1, size);
}
if (code == TSDB_CODE_SUCCESS) {
pCurWin->isOutput = true;
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->sessionWin);
} else {
pCurWin->sessionWin.win.skey = startTs;
pCurWin->sessionWin.win.ekey = endTs;
......@@ -3198,7 +3192,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
}
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
SSHashObj* pStDeleted) {
SSHashObj* pStDeleted, bool addGap) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
......@@ -3222,7 +3216,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
int64_t winDelta = 0;
if (IS_FINAL_OP(pInfo)) {
if (addGap) {
winDelta = pAggSup->gap;
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta);
......@@ -3240,6 +3234,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore);
pWinInfo->pOutputBuf = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -3253,8 +3248,13 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SResultRow* pResult = NULL;
int32_t rows = pSDataBlock->info.rows;
int32_t winRows = 0;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
if (pAggSup->winRange.ekey <= 0) {
pAggSup->winRange.ekey = INT64_MAX;
}
SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
......@@ -3266,7 +3266,6 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
TSKEY* endTsCols = (int64_t*)pEndTsCol->pData;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < rows;) {
if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
i++;
......@@ -3291,7 +3290,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted);
compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted, addGap);
saveSessionOutputBuf(pAggSup, &winInfo);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
......@@ -3455,7 +3454,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL);
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
saveResult(parentWin, pStUpdated);
} else {
break;
......@@ -3707,8 +3706,8 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) {
}
void resetWinRange(STimeWindow* winRange) {
winRange->skey = INT16_MIN;
winRange->skey = INT16_MAX;
winRange->skey = INT64_MIN;
winRange->ekey = INT64_MAX;
}
void streamSessionReloadState(SOperatorInfo* pOperator) {
......@@ -3724,10 +3723,16 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pStUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
}
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
saveSessionOutputBuf(pAggSup, &winInfo);
saveResult(winInfo, pInfo->pStUpdated);
}
taosMemoryFree(pBuf);
......@@ -4019,6 +4024,7 @@ void destroyStreamStateOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
taosArrayDestroy(pInfo->historyWins);
tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted);
taosMemoryFreeClear(param);
}
......@@ -4073,6 +4079,7 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if (code == TSDB_CODE_SUCCESS) {
pCurWin->winInfo.isOutput = true;
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
} else if (pKeyData) {
if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
varDataCopy(pCurWin->pStateKey->pData, pKeyData);
......@@ -4145,8 +4152,13 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t winRows = 0;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
if (pAggSup->winRange.ekey <= 0) {
pAggSup->winRange.ekey = INT64_MAX;
}
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
......@@ -4155,7 +4167,6 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
return;
}
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
int32_t rows = pSDataBlock->info.rows;
blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
......@@ -4335,7 +4346,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
SResultRow* pWinResult = NULL;
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, 1);
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
if (pNextWin->isOutput && pStDeleted) {
......@@ -4344,11 +4355,10 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
taosMemoryFree(pNextWin->pOutputBuf);
saveSessionOutputBuf(pAggSup, pCurWin);
}
void streamStateReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange);
......@@ -4360,14 +4370,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pSeUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
}
for (int32_t i = 0; i < num; i++) {
SStateWindowInfo curInfo = {0};
SStateWindowInfo nextInfo = {0};
SStateWindowInfo dummy = {0};
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
saveResult(curInfo.winInfo, pInfo->pStUpdated);
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
}
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
......@@ -4855,7 +4870,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
......
......@@ -1864,7 +1864,6 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize);
taosMemoryFreeClear(*pVal);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
taosMemoryFreeClear(*pVal);
......@@ -1880,7 +1879,6 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
}
......@@ -1938,14 +1936,12 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
......@@ -1961,7 +1957,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册