提交 955838d1 编写于 作者: 5 54liuyao

feat(stream): optimize disc buff

上级 83fcf4fd
...@@ -462,6 +462,7 @@ typedef struct SPartitionDataInfo { ...@@ -462,6 +462,7 @@ typedef struct SPartitionDataInfo {
typedef struct STimeWindowAggSupp { typedef struct STimeWindowAggSupp {
int8_t calTrigger; int8_t calTrigger;
int64_t waterMark; int64_t waterMark;
int64_t deleteMark;
TSKEY maxTs; TSKEY maxTs;
TSKEY minTs; TSKEY minTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
...@@ -1090,7 +1091,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI ...@@ -1090,7 +1091,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult); int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize); int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -4166,9 +4166,8 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI ...@@ -4166,9 +4166,8 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
}; };
char* value = NULL; char* value = NULL;
int32_t size = pAggSup->resultRowSize; int32_t size = pAggSup->resultRowSize;
/*if (streamStateGet(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {*/
/*value = taosMemoryCalloc(1, size);*/ tSimpleHashPut(pAggSup->pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
/*}*/
if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) { if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
...@@ -4186,7 +4185,7 @@ int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pR ...@@ -4186,7 +4185,7 @@ int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize); streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4259,8 +4258,9 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock ...@@ -4259,8 +4258,9 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock
} }
} }
} }
releaseOutputBuf(pTaskInfo, &key, pRow);
pBlock->info.rows += pRow->numOfRows; pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pTaskInfo, &key, pRow);
} }
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -867,6 +867,10 @@ static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* ...@@ -867,6 +867,10 @@ static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj*
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap); return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
} }
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(ts, -1, -1, groupId, pUpdatedMap);
}
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated); return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
} }
...@@ -918,12 +922,16 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { ...@@ -918,12 +922,16 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
} }
} }
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) { bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0); ASSERT(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0);
return pSup->maxTs != INT64_MIN && ts < pSup->maxTs - pSup->waterMark; return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
} }
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); } bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { return isOverdue(pWin->ekey, pTwSup); }
bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark;
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag) { int32_t scanFlag) {
...@@ -1374,6 +1382,41 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, ...@@ -1374,6 +1382,41 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData,
return true; return true;
} }
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, int32_t numOfOutput) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SWinKey key = {.ts = ts, .groupId = groupId};
tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
streamStateDel(pOperator->pTaskInfo->streamInfo.pState, &key);
return true;
}
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock,
SArray* pUpWins, SHashObj* pUpdatedMap) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
while (win.ekey <= endTsCols[i]) {
uint64_t winGpId = pGpDatas[i];
bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput);
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
if (pUpWins && res) {
taosArrayPush(pUpWins, &winRes);
}
if (pUpdatedMap) {
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
}
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
}
}
}
bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) { bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) {
size_t bytes = sizeof(TSKEY); size_t bytes = sizeof(TSKEY);
SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId); SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId);
...@@ -1383,8 +1426,6 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) ...@@ -1383,8 +1426,6 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
// window has been closed // window has been closed
return false; return false;
} }
// SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId);
// dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage);
tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return true; return true;
} }
...@@ -1512,6 +1553,49 @@ static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup ...@@ -1512,6 +1553,49 @@ static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) {
qDebug("===stream===close interval window");
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
SWinKey* pWinKey = (SWinKey*)key;
void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
STimeWindow win = {
.skey = pWinKey->ts,
.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1,
};
if (isCloseWindow(&win, pTwSup)) {
if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**)chIds;
int32_t size = taosArrayGetSize(chAy);
qDebug("===stream===window %" PRId64 " wait child size:%d", pWinKey->ts, size);
for (int32_t i = 0; i < size; i++) {
qDebug("===stream===window %" PRId64 " wait child id:%d", pWinKey->ts, *(int32_t*)taosArrayGet(chAy, i));
}
continue;
} else if (pPullDataMap) {
qDebug("===stream===close window %" PRId64, pWinKey->ts);
}
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveWinResultInfo(pWinKey->ts, pWinKey->groupId, closeWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
if (needDeleteWindowBuf(&win, pTwSup)) {
streamStateDel(pOperator->pTaskInfo->streamInfo.pState, pWinKey);
}
}
}
return TSDB_CODE_SUCCESS;
}
static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) { static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) {
int32_t size = taosArrayGetSize(pChildren); int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
...@@ -4918,8 +5002,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4918,8 +5002,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
SExprSupp* pSup = &pOperatorInfo->exprSupp; SExprSupp* pSup = &pOperatorInfo->exprSupp;
SInterval* pInterval = &iaInfo->interval; SInterval* pInterval = &iaInfo->interval;
int32_t startPos = 0; int32_t startPos = 0;
int64_t* tsCols = extractTsCol(pBlock, iaInfo); int64_t* tsCols = extractTsCol(pBlock, iaInfo);
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
...@@ -4938,7 +5022,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4938,7 +5022,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
win.skey = miaInfo->curTs; win.skey = miaInfo->curTs;
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret); T_LONG_JMP(pTaskInfo->env, ret);
} }
...@@ -4963,7 +5047,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4963,7 +5047,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
startPos = currPos; startPos = currPos;
ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret); T_LONG_JMP(pTaskInfo->env, ret);
} }
...@@ -5032,7 +5116,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5032,7 +5116,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
pMiaInfo->prefetchedBlock = pBlock; pMiaInfo->prefetchedBlock = pBlock;
cleanupAfterGroupResultGen(pMiaInfo, pRes); cleanupAfterGroupResultGen(pMiaInfo, pRes);
break; break;
} else { } else {
// continue // continue
} }
} }
...@@ -5197,7 +5281,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table ...@@ -5197,7 +5281,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL); ASSERT(p1 != NULL);
// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo); // finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5222,7 +5306,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t ...@@ -5222,7 +5306,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
STimeWindow* prevWin = &prevGrpWin->window; STimeWindow* prevWin = &prevGrpWin->window;
if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) { if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); // finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
tdListPopNode(miaInfo->groupIntervals, listNode); tdListPopNode(miaInfo->groupIntervals, listNode);
} }
} }
...@@ -5382,7 +5466,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5382,7 +5466,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
if (listNode != NULL) { if (listNode != NULL) {
SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data); SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); // finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
pRes->info.groupId = grpWin->groupId; pRes->info.groupId = grpWin->groupId;
} }
} }
...@@ -5591,7 +5675,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* ...@@ -5591,7 +5675,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock*
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC); TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultRow(pResult, tableGroupId, pUpdatedMap); saveWinResultInfo(pResult->win.skey, tableGroupId, pUpdatedMap);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
...@@ -5600,7 +5684,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* ...@@ -5600,7 +5684,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock*
.ts = nextWin.skey, .ts = nextWin.skey,
.groupId = tableGroupId, .groupId = tableGroupId,
}; };
saveOutput(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize); saveOutputBuf(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pTaskInfo, &key, pResult); releaseOutputBuf(pTaskInfo, &key, pResult);
int32_t prevEndPos = (forwardRows - 1) * step + startPos; int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
...@@ -5645,7 +5729,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5645,7 +5729,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
// doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
qDebug("===stream===single interval is done"); qDebug("===stream===single interval is done");
...@@ -5671,13 +5756,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5671,13 +5756,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock(pBlock, "single interval recv"); printDataBlock(pBlock, "single interval recv");
if (pBlock->info.type == STREAM_CLEAR) { if (pBlock->info.type == STREAM_CLEAR) {
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL);
NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue; continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval, // doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval,
pUpdatedMap); // pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins,
pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
...@@ -5704,9 +5790,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5704,9 +5790,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
minTs = TMIN(minTs, pBlock->info.window.skey); minTs = TMIN(minTs, pBlock->info.window.skey);
doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); // doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
// new disc buf // new disc buf
/*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/ doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
...@@ -5741,8 +5827,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5741,8 +5827,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
#endif #endif
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); pOperator);
void* pIte = NULL; void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
...@@ -5751,7 +5837,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5751,7 +5837,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
taosArraySort(pUpdated, resultrowComparAsc); taosArraySort(pUpdated, resultrowComparAsc);
// new disc buf // new disc buf
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); // finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated,
// pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins); removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
...@@ -5762,9 +5849,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5762,9 +5849,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); // doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
// new disc buf // new disc buf
// doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
printDataBlock(pInfo->binfo.pRes, "single interval"); printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
...@@ -5809,6 +5896,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5809,6 +5896,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = INT64_MAX,
}; };
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
......
...@@ -135,15 +135,9 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* ...@@ -135,15 +135,9 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
if (streamStateGet(pState, key, pVal, pVLen) == 0) { if (streamStateGet(pState, key, pVal, pVLen) == 0) {
return 0; return 0;
} }
void* tmp = taosMemoryCalloc(1, size); *pVal = tdbRealloc(NULL, size);
if (streamStatePut(pState, key, &tmp, size) == 0) { memset(*pVal, 0, size);
taosMemoryFree(tmp); return 0;
int32_t code = streamStateGet(pState, key, pVal, pVLen);
ASSERT(code == 0);
return code;
}
taosMemoryFree(tmp);
return -1;
} }
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) { int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
...@@ -191,9 +185,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key ...@@ -191,9 +185,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
return NULL;
}
int32_t c; int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur); taosMemoryFree(pCur);
return NULL; return NULL;
} }
...@@ -212,9 +211,14 @@ SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key ...@@ -212,9 +211,14 @@ SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
return NULL;
}
int32_t c; int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur); taosMemoryFree(pCur);
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册