提交 ab2b1bbf 编写于 作者: 5 54liuyao

feat(stream): sliding window

上级 55b94558
...@@ -108,6 +108,7 @@ typedef struct SDataBlockInfo { ...@@ -108,6 +108,7 @@ typedef struct SDataBlockInfo {
// TODO: optimize and remove following // TODO: optimize and remove following
int32_t childId; // used for stream, do not serialize int32_t childId; // used for stream, do not serialize
EStreamType type; // used for stream, do not serialize EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SSDataBlock { typedef struct SSDataBlock {
......
...@@ -389,6 +389,7 @@ typedef struct SStreamScanInfo { ...@@ -389,6 +389,7 @@ typedef struct SStreamScanInfo {
SSDataBlock* pPullDataRes; // pull data SSDataBlock SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex; int32_t deleteDataIndex;
STimeWindow updateWin;
// status for tmq // status for tmq
// SSchemaWrapper schema; // SSchemaWrapper schema;
......
...@@ -191,6 +191,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { ...@@ -191,6 +191,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
pBlock->info.blockId = pNode->dataBlockId; pBlock->info.blockId = pNode->dataBlockId;
pBlock->info.type = STREAM_INVALID; pBlock->info.type = STREAM_INVALID;
pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
......
...@@ -884,6 +884,28 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ ...@@ -884,6 +884,28 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
return true; return true;
} }
static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval,
TSDB_ORDER_ASC);
STimeWindow endWin = win;
STimeWindow preWin = win;
while (1) {
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey,
binarySearchForKey, NULL, TSDB_ORDER_ASC);
do {
preWin = endWin;
getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
} while (tsCol[(*pRowIndex) - 1] >= endWin.skey);
endWin = preWin;
if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows ) {
win.ekey = endWin.ekey;
return win;
}
win.ekey = endWin.ekey;
}
}
static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = { STimeWindow win = {
.skey = INT64_MIN, .skey = INT64_MIN,
...@@ -905,10 +927,13 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t ...@@ -905,10 +927,13 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else { } else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC);
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += pInfo->updateWin.skey = tsCols[*pRowIndex];
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); win = getSlidingWindow(tsCols, &pInfo->interval, &pSDB->info, pRowIndex);
pInfo->updateWin.ekey = tsCols[*pRowIndex - 1];
// win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC);
// (*pRowIndex) +=
// getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
} }
needRead = true; needRead = true;
} else if (isStateWindow(pInfo)) { } else if (isStateWindow(pInfo)) {
...@@ -974,10 +999,12 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_ ...@@ -974,10 +999,12 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_
} }
} }
if (!pResult) { if (!pResult) {
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
return NULL; return NULL;
} }
if (pResult->info.groupId == pInfo->groupId) { if (pResult->info.groupId == pInfo->groupId) {
pResult->info.calWin = pInfo->updateWin;
return pResult; return pResult;
} }
} }
...@@ -1256,6 +1283,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1256,6 +1283,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
// TODO move into scan // TODO move into scan
pBlock->info.calWin.skey = INT64_MIN;
pBlock->info.calWin.ekey = INT64_MAX;
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) { switch (pBlock->info.type) {
case STREAM_RETRIEVE: { case STREAM_RETRIEVE: {
...@@ -1533,6 +1562,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1533,6 +1562,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pStreamScanOp = pOperator; pInfo->pStreamScanOp = pOperator;
pInfo->deleteDataIndex = 0; pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createPullDataBlock(); pInfo->pDeleteDataRes = createPullDataBlock();
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
pOperator->name = "StreamScanOperator"; pOperator->name = "StreamScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
...@@ -419,6 +419,14 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx ...@@ -419,6 +419,14 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
return true; return true;
} }
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
if (pInterval->interval != pInterval->sliding && (pWin->ekey < pBlockInfo->calWin.skey ||
pWin->skey > pBlockInfo->calWin.ekey) ) {
return false;
}
return true;
}
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order) { TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
bool ascQuery = (order == TSDB_ORDER_ASC); bool ascQuery = (order == TSDB_ORDER_ASC);
...@@ -432,6 +440,10 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, ...@@ -432,6 +440,10 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext,
return -1; return -1;
} }
if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
return -1;
}
TSKEY skey = ascQuery ? pNext->skey : pNext->ekey; TSKEY skey = ascQuery ? pNext->skey : pNext->ekey;
int32_t startPos = 0; int32_t startPos = 0;
...@@ -801,7 +813,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -801,7 +813,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) { if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
...@@ -834,7 +846,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -834,7 +846,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
} }
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) { if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, pInfo->order); pBlock->info.rows, numOfOutput, pInfo->order);
...@@ -916,7 +928,7 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo ...@@ -916,7 +928,7 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo
tsCols = (int64_t*)pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
if (tsCols != NULL) { if (tsCols != NULL) {
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
} }
} }
...@@ -1279,17 +1291,23 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* ...@@ -1279,17 +1291,23 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
pGpDatas = (uint64_t*)pGpCol->pData; pGpDatas = (uint64_t*)pGpCol->pData;
} }
int32_t step = 0; int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) { int32_t startPos = 0;
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[0], pInterval, TSDB_ORDER_ASC);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); while (1) {
uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId; step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId;
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput); bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput);
if (pUpWins && res) { if (pUpWins && res) {
SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; SWinRes winRes = {.ts = win.skey, .groupId = winGpId};
taosArrayPush(pUpWins, &winRes); taosArrayPush(pUpWins, &winRes);
} }
int32_t prevEndPos = step - 1 + startPos;
startPos = getNextQualifiedWindow(pInterval, &win, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
if (startPos < 0) {
break;
}
} }
} }
...@@ -2434,7 +2452,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2434,7 +2452,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
} }
while (1) { while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if (pInfo->ignoreExpiredData && isClosed) { if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) { if (startPos < 0) {
break; break;
...@@ -3101,12 +3119,7 @@ int64_t getSessionWindowEndkey(void* data, int32_t index) { ...@@ -3101,12 +3119,7 @@ int64_t getSessionWindowEndkey(void* data, int32_t index) {
} }
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) { bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
int64_t sGap = ts - pWin->skey + gap; if (ts + gap >= pWin->skey && ts - gap <= pWin->ekey) {
int64_t eGap = pWin->ekey - ts + gap;
// if ((sGap < 0 && sGap >= -gap) || (eGap < 0 && eGap >= -gap) || (sGap >= 0 && eGap >= 0)) {
// return true;
// }
if (sGap >= 0 && eGap >= 0) {
return true; return true;
} }
return false; return false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册