提交 2fc5eeb8 编写于 作者: L liuyao

session win range

上级 8867a572
...@@ -56,7 +56,8 @@ typedef struct { ...@@ -56,7 +56,8 @@ typedef struct {
void* pStateBackend; void* pStateBackend;
struct SStorageAPI api; struct SStorageAPI api;
int8_t fillHistory; int8_t fillHistory;
STimeWindow winRange;
} SReadHandle; } SReadHandle;
// in queue mode, data streams are seperated by msg // in queue mode, data streams are seperated by msg
......
...@@ -826,7 +826,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -826,7 +826,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
return -1; return -1;
} }
SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory}; SReadHandle handle = {.vnode = pTq->pVnode,
.initTqReader = 1,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
...@@ -849,7 +853,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -849,7 +853,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList); int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory}; SReadHandle handle = {.vnode = NULL,
.numOfVgroups = numOfVgroups,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
......
...@@ -285,6 +285,8 @@ typedef struct SStreamAggSupporter { ...@@ -285,6 +285,8 @@ typedef struct SStreamAggSupporter {
int16_t stateKeyType; int16_t stateKeyType;
SDiskbasedBuf* pResultBuf; SDiskbasedBuf* pResultBuf;
SStateStore stateStore; SStateStore stateStore;
STimeWindow winRange;
SStorageAPI* pSessionAPI;
} SStreamAggSupporter; } SStreamAggSupporter;
typedef struct SWindowSupporter { typedef struct SWindowSupporter {
......
...@@ -2966,7 +2966,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin ...@@ -2966,7 +2966,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
} }
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore) { SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, SStorageAPI* pApi) {
pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput); pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
pSup->gap = gap; pSup->gap = gap;
...@@ -3008,6 +3008,16 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, ...@@ -3008,6 +3008,16 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
pCtx[i].saveHandle.pBuf = pSup->pResultBuf; pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
} }
if (pHandle) {
pSup->winRange = pHandle->winRange;
// temporary
if (pSup->winRange.ekey <= 0) {
pSup->winRange.ekey = INT64_MAX;
}
}
pSup->pSessionAPI = pApi;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3035,6 +3045,13 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT ...@@ -3035,6 +3045,13 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->sessionWin.win.skey == 0; } bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->sessionWin.win.skey == 0; }
bool inWinRange(STimeWindow* range, STimeWindow* cur) {
if (cur->skey >= range->skey && cur->ekey <= range->ekey) {
return true;
}
return false;
}
void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
SResultWindowInfo* pCurWin) { SResultWindowInfo* pCurWin) {
pCurWin->sessionWin.groupId = groupId; pCurWin->sessionWin.groupId = groupId;
...@@ -3043,6 +3060,12 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT ...@@ -3043,6 +3060,12 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
int32_t size = pAggSup->resultRowSize; int32_t size = pAggSup->resultRowSize;
int32_t code = pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, int32_t code = pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin,
pAggSup->gap, &pCurWin->pOutputBuf, &size); pAggSup->gap, &pCurWin->pOutputBuf, &size);
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->sessionWin.win)) {
code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->pOutputBuf, &pAggSup->pSessionAPI->stateStore);
pCurWin->pOutputBuf = taosMemoryMalloc(size);
}
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pCurWin->isOutput = true; pCurWin->isOutput = true;
} else { } else {
...@@ -3189,7 +3212,8 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC ...@@ -3189,7 +3212,8 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
while (1) { while (1) {
SResultWindowInfo winInfo = {0}; SResultWindowInfo winInfo = {0};
SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo); SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo);
if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap)) { if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) ||
!inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) {
taosMemoryFree(winInfo.pOutputBuf); taosMemoryFree(winInfo.pOutputBuf);
pAPI->stateStore.streamStateFreeCur(pCur); pAPI->stateStore.streamStateFreeCur(pCur);
break; break;
...@@ -3413,8 +3437,12 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS ...@@ -3413,8 +3437,12 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SResultWindowInfo childWin = {0}; SResultWindowInfo childWin = {0};
childWin.sessionWin = *pWinKey; childWin.sessionWin = *pWinKey;
int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin); int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
if (code == TSDB_CODE_SUCCESS && pWinKey->win.skey <= childWin.sessionWin.win.skey &&
childWin.sessionWin.win.ekey <= pWinKey->win.ekey) { if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) {
continue;
}
if (code == TSDB_CODE_SUCCESS && inWinRange(&pWinKey->win, &childWin.sessionWin.win)) {
if (num == 0) { if (num == 0) {
setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin); setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
...@@ -3678,9 +3706,16 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) { ...@@ -3678,9 +3706,16 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) {
} }
} }
void resetWinRange(STimeWindow* winRange) {
winRange->skey = INT16_MIN;
winRange->skey = INT16_MAX;
}
void streamSessionReloadState(SOperatorInfo* pOperator) { void streamSessionReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange);
SResultWindowInfo winInfo = {0}; SResultWindowInfo winInfo = {0};
int32_t size = 0; int32_t size = 0;
void* pBuf = NULL; void* pBuf = NULL;
...@@ -3734,7 +3769,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3734,7 +3769,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
} }
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap, code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore); pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4024,6 +4059,12 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, ...@@ -4024,6 +4059,12 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
pCurWin->pStateKey->isNull = false; pCurWin->pStateKey->isNull = false;
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) {
code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size);
}
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pCurWin->winInfo.isOutput = true; pCurWin->winInfo.isOutput = true;
} else if (pKeyData) { } else if (pKeyData) {
...@@ -4292,6 +4333,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur ...@@ -4292,6 +4333,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
void streamStateReloadState(SOperatorInfo* pOperator) { void streamStateReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange);
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
int32_t size = 0; int32_t size = 0;
void* pBuf = NULL; void* pBuf = NULL;
...@@ -4361,7 +4404,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4361,7 +4404,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
int16_t type = pColNode->node.resType.type; int16_t type = pColNode->node.resType.type;
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
type, &pTaskInfo->storageAPI.stateStore); type, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册