提交 87bcbe00 编写于 作者: 5 54liuyao

feat(stream): stream state&session support partition by

上级 c53c6992
...@@ -365,7 +365,9 @@ typedef struct SCatchSupporter { ...@@ -365,7 +365,9 @@ typedef struct SCatchSupporter {
} SCatchSupporter; } SCatchSupporter;
typedef struct SStreamAggSupporter { typedef struct SStreamAggSupporter {
SArray* pResultRows; SHashObj* pResultRows;
SArray* pCurWins;
int32_t valueSize;
int32_t keySize; int32_t keySize;
char* pKeyBuf; // window key buffer char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
...@@ -899,9 +901,9 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary ...@@ -899,9 +901,9 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey,
SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size); SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex); SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted); int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool functionNeedToExecute(SqlFunctionCtx* pCtx);
......
...@@ -4814,6 +4814,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4814,6 +4814,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
}; };
ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type); bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
...@@ -5498,14 +5499,16 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo ...@@ -5498,14 +5499,16 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
} }
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
size_t size) { int32_t size) {
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
pSup->pResultRows = taosArrayInit(1024, size); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) { if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pSup->valueSize = size;
pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow)); pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow));
......
...@@ -716,7 +716,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -716,7 +716,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
int64_t gap = pInfo->sessionSup.gap; int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup->pResultRows, tsCols[pInfo->updateResIndex], gap, &winIndex); getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win; win = pCurWin->win;
pInfo->updateResIndex += pInfo->updateResIndex +=
updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL); updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
......
...@@ -1320,24 +1320,23 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1320,24 +1320,23 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
}
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL); doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL && } else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue; continue;
} }
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
} }
...@@ -2038,7 +2037,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2038,7 +2037,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_REPROCESS) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, pInfo->primaryTsIndex, pOperator->numOfExprs, doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, pInfo->primaryTsIndex, pOperator->numOfExprs,
...@@ -2058,12 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2058,12 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
break; break;
} else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo) && } else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo)) {
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue; continue;
} }
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
if (isFinalInterval(pInfo)) { if (isFinalInterval(pInfo)) {
int32_t chIndex = getChildIndex(pBlock); int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
...@@ -2125,6 +2123,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2125,6 +2123,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
}; };
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
...@@ -2190,8 +2189,13 @@ _error: ...@@ -2190,8 +2189,13 @@ _error:
} }
void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
taosArrayDestroy(pSup->pResultRows);
taosMemoryFreeClear(pSup->pKeyBuf); taosMemoryFreeClear(pSup->pKeyBuf);
void **pIte = NULL;
while ((pIte = taosHashIterate(pSup->pResultRows, pIte)) != NULL) {
SArray *pWins = (SArray *) (*pIte);
taosArrayDestroy(pWins);
}
taosHashCleanup(pSup->pResultRows);
destroyDiskbasedBuf(pSup->pResultBuf); destroyDiskbasedBuf(pSup->pResultBuf);
} }
...@@ -2333,7 +2337,22 @@ static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) { ...@@ -2333,7 +2337,22 @@ static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) {
return taosArrayPush(pWinInfos, &win); return taosArrayPush(pWinInfos, &win);
} }
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex) { SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) {
void** ite = taosHashGet(pAggSup->pResultRows, &groupId, sizeof(uint64_t));
SArray* pWinInfos = NULL;
if (ite == NULL) {
pWinInfos = taosArrayInit(1024, pAggSup->valueSize);
taosHashPut(pAggSup->pResultRows, &groupId, sizeof(uint64_t), &pWinInfos, sizeof(void *));
} else {
pWinInfos = *ite;
}
return pWinInfos;
}
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex) {
SArray* pWinInfos = getWinInfos(pAggSup, groupId);
pAggSup->pCurWins = pWinInfos;
int32_t size = taosArrayGetSize(pWinInfos); int32_t size = taosArrayGetSize(pWinInfos);
if (size == 0) { if (size == 0) {
*pIndex = 0; *pIndex = 0;
...@@ -2389,7 +2408,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes ...@@ -2389,7 +2408,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
assert(pWinInfo->win.skey <= pWinInfo->win.ekey); assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
// too many time window in query // too many time window in query
int32_t size = taosArrayGetSize(pAggSup->pResultRows); int32_t size = taosArrayGetSize(pAggSup->pCurWins);
if (size > MAX_INTERVAL_TIME_WINDOW) { if (size > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
...@@ -2449,25 +2468,6 @@ static int32_t doOneStateWindowAgg(SStreamStateAggOperatorInfo* pInfo, SSDataBlo ...@@ -2449,25 +2468,6 @@ static int32_t doOneStateWindowAgg(SStreamStateAggOperatorInfo* pInfo, SSDataBlo
pSDataBlock, pCurWin, pResult, startIndex, winRows, numOutput, pTaskInfo); pSDataBlock, pCurWin, pResult, startIndex, winRows, numOutput, pTaskInfo);
} }
int32_t copyWinInfoToDataBlock(SSDataBlock* pBlock, SStreamAggSupporter* pAggSup, int32_t start, int32_t num,
int32_t numOfExprs, SOptrBasicInfo* pBinfo) {
for (int32_t i = start; i < num; i += 1) {
SResultWindowInfo* pWinInfo = taosArrayGet(pAggSup->pResultRows, start);
SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pWinInfo->pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pWinInfo->pos.offset);
for (int32_t j = 0; j < numOfExprs; ++j) {
SResultRowEntryInfo* pResultInfo = getResultCell(pRow, j, pBinfo->rowCellInfoOffset);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
char* in = GET_ROWCELL_INTERBUF(pBinfo->pCtx[j].resultInfo);
colDataAppend(pColInfoData, pBlock->info.rows, in, pResultInfo->isNullRes);
}
pBlock->info.rows += pRow->numOfRows;
releaseBufPage(pAggSup->pResultBuf, bufPage);
}
blockDataUpdateTsWindow(pBlock, -1);
return TSDB_CODE_SUCCESS;
}
int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) { int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) {
SResultWindowInfo* pCurWin = taosArrayGet(pWinInfos, startIndex); SResultWindowInfo* pCurWin = taosArrayGet(pWinInfos, startIndex);
int32_t size = taosArrayGetSize(pWinInfos); int32_t size = taosArrayGetSize(pWinInfos);
...@@ -2484,15 +2484,15 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) ...@@ -2484,15 +2484,15 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap)
void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId, void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) {
SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pResultRows, startIndex); SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex);
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
setWindowOutputBuf(pCurWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, setWindowOutputBuf(pCurWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset,
&pInfo->streamAggSup, pTaskInfo); &pInfo->streamAggSup, pTaskInfo);
num += startIndex + 1; num += startIndex + 1;
ASSERT(num <= taosArrayGetSize(pInfo->streamAggSup.pResultRows)); ASSERT(num <= taosArrayGetSize(pInfo->streamAggSup.pCurWins));
// Just look for the window behind StartIndex // Just look for the window behind StartIndex
for (int32_t i = startIndex + 1; i < num; i++) { for (int32_t i = startIndex + 1; i < num; i++) {
SResultWindowInfo* pWinInfo = taosArrayGet(pInfo->streamAggSup.pResultRows, i); SResultWindowInfo* pWinInfo = taosArrayGet(pInfo->streamAggSup.pCurWins, i);
SResultRow* pWinResult = NULL; SResultRow* pWinResult = NULL;
setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset,
&pInfo->streamAggSup, pTaskInfo); &pInfo->streamAggSup, pTaskInfo);
...@@ -2503,7 +2503,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, ...@@ -2503,7 +2503,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY)); taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
pWinInfo->isOutput = false; pWinInfo->isOutput = false;
} }
taosArrayRemove(pInfo->streamAggSup.pResultRows, i); taosArrayRemove(pInfo->streamAggSup.pCurWins, i);
} }
} }
...@@ -2533,7 +2533,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -2533,7 +2533,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < pSDataBlock->info.rows;) { for (int32_t i = 0; i < pSDataBlock->info.rows;) {
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex); SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, gap, &winIndex);
winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
...@@ -2543,7 +2543,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -2543,7 +2543,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
// forwardRows, // forwardRows,
// pInfo->order, false); // pInfo->order, false);
int32_t winNum = getNumCompactWindow(pAggSup->pResultRows, winIndex, gap); int32_t winNum = getNumCompactWindow(pAggSup->pCurWins, winIndex, gap);
if (winNum > 0) { if (winNum > 0) {
compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pTaskInfo, pStUpdated, pStDeleted); compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pTaskInfo, pStUpdated, pStDeleted);
} }
...@@ -2566,7 +2566,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* ...@@ -2566,7 +2566,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo*
int32_t step = 0; int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex); SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pBlock->info.groupId, gap, &winIndex);
step = updateSessionWindowInfo(pCurWin, tsCols, pBlock->info.rows, i, gap, NULL); step = updateSessionWindowInfo(pCurWin, tsCols, pBlock->info.rows, i, gap, NULL);
ASSERT(isInWindow(pCurWin, tsCols[i], gap)); ASSERT(isInWindow(pCurWin, tsCols[i], gap));
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pBinfo, numOfOutput); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pBinfo, numOfOutput);
...@@ -2627,7 +2627,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin ...@@ -2627,7 +2627,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
for (int32_t j = 0; j < numOfChildren; j++) { for (int32_t j = 0; j < numOfChildren; j++) {
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
SArray* pChWins = pChInfo->streamAggSup.pResultRows; SArray* pChWins = getWinInfos(&pChInfo->streamAggSup, groupId);
int32_t chWinSize = taosArrayGetSize(pChWins); int32_t chWinSize = taosArrayGetSize(pChWins);
int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, TSDB_ORDER_DESC, getSessionWindowEndkey); int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, TSDB_ORDER_DESC, getSessionWindowEndkey);
for (int32_t k = index; k > 0 && k < chWinSize; k++) { for (int32_t k = index; k > 0 && k < chWinSize; k++) {
...@@ -2651,36 +2651,44 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*); ...@@ -2651,36 +2651,44 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getSessionWinInfo(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getSessionWinInfo(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getStateWinInfo(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } SResultWindowInfo* getStateWinInfo(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed, int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed,
__get_win_info_ fn) { __get_win_info_ fn) {
// Todo(liuyao) save window to tdb // Todo(liuyao) save window to tdb
int32_t size = taosArrayGetSize(pWins); void **pIte = NULL;
for (int32_t i = 0; i < size; i++) { while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
void* pWin = taosArrayGet(pWins, i); SArray *pWins = (SArray *) (*pIte);
SResultWindowInfo* pSeWin = fn(pWin); int32_t size = taosArrayGetSize(pWins);
if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) { for (int32_t i = 0; i < size; i++) {
if (!pSeWin->isClosed) { void* pWin = taosArrayGet(pWins, i);
pSeWin->isClosed = true; SResultWindowInfo* pSeWin = fn(pWin);
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); if (!pSeWin->isClosed) {
pSeWin->isOutput = true; pSeWin->isClosed = true;
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed);
pSeWin->isOutput = true;
}
} }
continue;
} }
continue; break;
} }
break;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getAllSessionWindow(SArray* pWins, SArray* pClosed, __get_win_info_ fn) { int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ fn) {
int32_t size = taosArrayGetSize(pWins); void **pIte = NULL;
for (int32_t i = 0; i < size; i++) { while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
void* pWin = taosArrayGet(pWins, i); SArray *pWins = (SArray *) (*pIte);
SResultWindowInfo* pSeWin = fn(pWin); int32_t size = taosArrayGetSize(pWins);
if (!pSeWin->isClosed) { for (int32_t i = 0; i < size; i++) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); void* pWin = taosArrayGet(pWins, i);
pSeWin->isOutput = true; SResultWindowInfo* pSeWin = fn(pWin);
if (!pSeWin->isClosed) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed);
pSeWin->isOutput = true;
}
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2714,8 +2722,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2714,8 +2722,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_REPROCESS) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, &pInfo->binfo, pBlock, 0, pOperator->numOfExprs, pInfo->gap, pWins); doClearSessionWindows(&pInfo->streamAggSup, &pInfo->binfo, pBlock, 0, pOperator->numOfExprs, pInfo->gap, pWins);
...@@ -2729,12 +2736,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2729,12 +2736,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} }
taosArrayDestroy(pWins); taosArrayDestroy(pWins);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL && } else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getSessionWinInfo); getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getSessionWinInfo);
continue; continue;
} }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
if (isFinalSession(pInfo)) { if (isFinalSession(pInfo)) {
int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock
SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
...@@ -2873,7 +2881,9 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) { ...@@ -2873,7 +2881,9 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
return pKeyData && compareVal(pKeyData, &pWin->stateKey); return pKeyData && compareVal(pKeyData, &pWin->stateKey);
} }
SStateWindowInfo* getStateWindowByTs(SArray* pWinInfos, TSKEY ts, int32_t* pIndex) { SStateWindowInfo* getStateWindowByTs(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int32_t* pIndex) {
SArray* pWinInfos = getWinInfos(pAggSup, groupId);
pAggSup->pCurWins = pWinInfos;
int32_t size = taosArrayGetSize(pWinInfos); int32_t size = taosArrayGetSize(pWinInfos);
int32_t index = binarySearch(pWinInfos, size, ts, TSDB_ORDER_DESC, getStateWinTsKey); int32_t index = binarySearch(pWinInfos, size, ts, TSDB_ORDER_DESC, getStateWinTsKey);
SStateWindowInfo* pWin = NULL; SStateWindowInfo* pWin = NULL;
...@@ -2896,7 +2906,10 @@ SStateWindowInfo* getStateWindowByTs(SArray* pWinInfos, TSKEY ts, int32_t* pInde ...@@ -2896,7 +2906,10 @@ SStateWindowInfo* getStateWindowByTs(SArray* pWinInfos, TSKEY ts, int32_t* pInde
return NULL; return NULL;
} }
SStateWindowInfo* getStateWindow(SArray* pWinInfos, TSKEY ts, char* pKeyData, SColumn* pCol, int32_t* pIndex) { SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts,
uint64_t groupId, char* pKeyData, SColumn* pCol, int32_t* pIndex) {
SArray* pWinInfos = getWinInfos(pAggSup, groupId);
pAggSup->pCurWins = pWinInfos;
int32_t size = taosArrayGetSize(pWinInfos); int32_t size = taosArrayGetSize(pWinInfos);
if (size == 0) { if (size == 0) {
*pIndex = 0; *pIndex = 0;
...@@ -2987,16 +3000,16 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc ...@@ -2987,16 +3000,16 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
char* pKeyData = colDataGetData(pKeyColInfo, i); char* pKeyData = colDataGetData(pKeyColInfo, i);
int32_t winIndex = 0; int32_t winIndex = 0;
SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup->pResultRows, tsCol[i], &winIndex); SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], pBlock->info.groupId, &winIndex);
if (!pCurWin) { if (!pCurWin) {
continue; continue;
} }
step = updateStateWindowInfo(pAggSup->pResultRows, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual, step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual,
pSeDeleted); pSeDeleted);
ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData)); ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData));
taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pResultRows, winIndex); deleteWindow(pAggSup->pCurWins, winIndex);
} }
} }
...@@ -3026,13 +3039,15 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -3026,13 +3039,15 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
char* pKeyData = colDataGetData(pKeyColInfo, i); char* pKeyData = colDataGetData(pKeyColInfo, i);
int32_t winIndex = 0; int32_t winIndex = 0;
bool allEqual = true; bool allEqual = true;
SStateWindowInfo* pCurWin = getStateWindow(pAggSup->pResultRows, tsCols[i], pKeyData, &pInfo->stateCol, &winIndex); SStateWindowInfo* pCurWin =
winRows = updateStateWindowInfo(pAggSup->pResultRows, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i, getStateWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, pKeyData,
&allEqual, pInfo->pSeDeleted); &pInfo->stateCol, &winIndex);
winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo,
pSDataBlock->info.rows, i, &allEqual, pInfo->pSeDeleted);
if (!allEqual) { if (!allEqual) {
taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pResultRows, winIndex); deleteWindow(pAggSup->pCurWins, winIndex);
continue; continue;
} }
code = doOneStateWindowAgg(pInfo, pSDataBlock, &pCurWin->winInfo, &pResult, i, winRows, numOfOutput, pTaskInfo); code = doOneStateWindowAgg(pInfo, pSDataBlock, &pCurWin->winInfo, &pResult, i, winRows, numOfOutput, pTaskInfo);
...@@ -3079,17 +3094,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3079,17 +3094,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_REPROCESS) {
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId, doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
pSeUpdated, pInfo->pSeDeleted); pSeUpdated, pInfo->pSeDeleted);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL && } else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getStateWinInfo); getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getStateWinInfo);
continue; continue;
} }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted); doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册