提交 caa45033 编写于 作者: L liuyao

reset state key memory

上级 7e896221
......@@ -4036,7 +4036,7 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
bool compareStateKey(void* data, void* key) {
if (!data || !key) {
return true;
return false;
}
SStateKeys* stateKey = (SStateKeys*)key;
stateKey->pData = (char*)key + sizeof(SStateKeys);
......@@ -4062,7 +4062,13 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) {
code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size);
pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size);
pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pCurWin->pStateKey->type = pAggSup->stateKeyType;
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
pCurWin->pStateKey->isNull = false;
}
if (code == TSDB_CODE_SUCCESS) {
......@@ -4076,11 +4082,19 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
}
pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
pNextWin->winInfo.pOutputBuf = NULL;
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0);
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin);
int32_t nextSize = pAggSup->resultRowSize;
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, &pNextWin->winInfo.pOutputBuf, &nextSize);
if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_INVALID(pNextWin->winInfo);
} else {
pNextWin->pStateKey =
(SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pNextWin->pStateKey->type = pAggSup->stateKeyType;
pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys);
pNextWin->pStateKey->isNull = false;
pNextWin->winInfo.isOutput = true;
}
pAggSup->stateStore.streamStateFreeCur(pCur);
}
......@@ -4156,6 +4170,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStateWindowInfo curWin = {0};
SStateWindowInfo nextWin = {0};
setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin);
if (IS_VALID_SESSION_WIN(nextWin.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextWin.winInfo.pOutputBuf, &pAPI->stateStore);
}
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
pAggSup->pResultRows, pSeUpdated, pStDeleted);
......@@ -4346,9 +4363,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < num; i++) {
SStateWindowInfo curInfo = {0};
SStateWindowInfo nextInfo = {0};
SStateWindowInfo dummy = {0};
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
saveResult(curInfo.winInfo, pInfo->pStUpdated);
}
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
}
if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
}
}
taosMemoryFree(pBuf);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册