diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2a40976a8b9a21e47ca49b0d62193f2c8fcff306..31c4ec41eab34a02d6bde9a50c60c423812dac6b 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -58,15 +58,15 @@ static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) { SWinKey* pWin1 = (SWinKey*)pKey1; SWinKey* pWin2 = (SWinKey*)pKey2; - if (pWin1->groupId > pWin2->groupId) { + if (pWin1->ts > pWin2->ts) { return 1; - } else if (pWin1->groupId < pWin2->groupId) { + } else if (pWin1->ts < pWin2->ts) { return -1; } - if (pWin1->ts > pWin2->ts) { + if (pWin1->groupId > pWin2->groupId) { return 1; - } else if (pWin1->ts < pWin2->ts) { + } else if (pWin1->groupId < pWin2->groupId) { return -1; } diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index b8d6e7aa8e534e257b2f8cad1aa3c57bd158e9a1..8e142d3bca4013329cdd3ca39fd8c728b24fad06 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -109,7 +109,6 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal); void streamFreeVal(void* val); -SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 9d8a4ae18682c6ad74d5927f46bb4cba71a965c7..fd9849d194de118982fc613e58c45dd912e2a25d 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -47,7 +47,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); -int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize); +int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); int32_t recoverSnapshot(SStreamFileState* pFileState); #ifdef __cplusplus diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 17b9ebb65b744d6189b4cd9425e0296ed7e07b6c..761d712a3aaf987a83bf2d52119ff7dac9aa6663 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -197,7 +197,7 @@ char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udf char tsUdfdLdLibPath[512] = ""; bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; -int64_t tsCheckpointInterval = 24 * 60 * 60 * 1000; +int64_t tsCheckpointInterval = 3 * 60 * 60 * 1000; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b06eaca6b286023a537734508595cad1e62027d7..74f642271ad933139e32910ae466eec5f80cbaa0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -426,6 +426,8 @@ typedef struct STimeWindowAggSupp { int64_t waterMark; TSKEY maxTs; TSKEY minTs; + TSKEY checkPointTs; + TSKEY checkPointInterval; SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STimeWindowAggSupp; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 14521df69f4f14960293990952dd1326b462d910..5daf02b212c7ec2dab23c5d247081428ff08554b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -869,22 +869,8 @@ static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) { } int32_t compareWinKey(void* pKey, void* data, int32_t index) { - SArray* res = (SArray*)data; - SWinKey* pDataPos = taosArrayGet(res, index); - SWinKey* pWKey = (SWinKey*)pKey; - - if (pWKey->groupId > pDataPos->groupId) { - return 1; - } else if (pWKey->groupId < pDataPos->groupId) { - return -1; - } - - if (pWKey->ts > pDataPos->ts) { - return 1; - } else if (pWKey->ts < pDataPos->ts) { - return -1; - } - return 0; + void* pDataPos = taosArrayGet((SArray*)data, index); + return winKeyCmprImpl(pKey, pDataPos); } static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) { @@ -1497,30 +1483,6 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, break; } } - - // for debug - if (qDebugFlag & DEBUG_DEBUG && mark > 0) { - SStreamStateCur* pCur = streamStateGetCur(pState, key); - int32_t code = streamStateCurPrev(pState, pCur); - if (code == TSDB_CODE_SUCCESS) { - SWinKey tmpKey = {0}; - code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0); - if (code == TSDB_CODE_SUCCESS) { - STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval); - qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, - tw.ekey, tmpKey.groupId, mark); - } else { - STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); - qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey, - key->groupId, mark); - } - } else { - STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); - qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey, - key->groupId, mark); - } - streamStateFreeCur(pCur); - } } static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) { @@ -2641,7 +2603,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else { deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, &pInfo->delKey); - streamStateCommit(pTaskInfo->streamInfo.pState); + if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { + streamStateCommit(pInfo->pState); + pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; + } } return NULL; } else { @@ -2833,6 +2798,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .deleteMark = getDeleteMark(pIntervalPhyNode), .deleteMarkSaved = 0, .calTriggerSaved = 0, + .checkPointTs = 0, + .checkPointInterval = convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), }; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -4887,7 +4854,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, &pInfo->delKey); setOperatorCompleted(pOperator); - streamStateCommit(pTaskInfo->streamInfo.pState); + if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { + streamStateCommit(pInfo->pState); + pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; + } return NULL; } @@ -5000,7 +4970,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - SInterval interval = { + pInfo->interval = (SInterval) { .interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, @@ -5009,19 +4979,19 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, }; - STimeWindowAggSupp twAggSupp = { + pInfo->twAggSup = (STimeWindowAggSupp) { .waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, .deleteMark = getDeleteMark(pIntervalPhyNode), + .checkPointTs = 0, + .checkPointInterval = convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), }; - ASSERTS(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); + ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); pOperator->pTaskInfo = pTaskInfo; - pInfo->interval = interval; - pInfo->twAggSup = twAggSupp; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->isFinal = false; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 47c91ae090420179aa34408c5c0b885b7d016142..a73c2587a7e16b0758fe21029e8a2219c78489c8 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -247,6 +247,8 @@ int32_t streamStateBegin(SStreamState* pState) { int32_t streamStateCommit(SStreamState* pState) { #ifdef USE_ROCKSDB + SStreamSnapshot* pShot = getSnapshot(pState->pFileState); + flushSnapshot(pState->pFileState, pShot, true); return 0; #else if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) { @@ -410,26 +412,6 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV return 0; } -SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { -#ifdef USE_ROCKSDB - return streamStateGetCur_rocksdb(pState, key); -#else - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - if (pCur == NULL) return NULL; - tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL); - - int32_t c = 0; - SStateKey sKey = {.key = *key, .opNum = pState->number}; - tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c); - if (c != 0) { - streamStateFreeCur(pCur); - return NULL; - } - pCur->number = pState->number; - return pCur; -#endif -} - SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) { #ifdef USE_ROCKSDB return streamStateFillGetCur_rocksdb(pState, key); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 85b674debee947087ce742594e8cc407f96780ca..a3b7cd9b512bb994c162c201b3b87e3e67085b04 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -71,6 +71,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFu pFileState->curRowCount = 0; pFileState->deleteMark = delMark; pFileState->flushMark = -1; + recoverSnapshot(pFileState); return pFileState; _error: @@ -152,7 +153,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { i++; } } - flushSnapshot(pFileState->pFileStore, pFlushList, pFileState->rowSize); + flushSnapshot(pFileState, pFlushList, false); return TSDB_CODE_SUCCESS; } @@ -272,7 +273,18 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { return pFileState->usedBuffs; } -int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize) { +void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) { + pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark); +} + +void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) { + *pLen = sizeof(TSKEY); + *pVal = taosMemoryCalloc(1, *pLen); + void** buff = pVal; + taosEncodeFixedI64(buff, pFileState->flushMark); +} + +int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { int32_t code = TSDB_CODE_SUCCESS; SListIter iter = {0}; tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD); @@ -280,11 +292,60 @@ int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize) SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; - code = streamStatePut_rocksdb(pFile, pPos->pKey, pPos->pRowBuff, rowSize); + code = streamStatePut_rocksdb(pFileState->pFileStore, pPos->pKey, pPos->pRowBuff, pFileState->rowSize); + } + if (flushState) { + int32_t len = 0; + void* buff = NULL; + streamFileStateEncode(pFileState, &buff, &len); + SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao + streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len); } return code; } int32_t recoverSnapshot(SStreamFileState* pFileState) { + int32_t code = TSDB_CODE_SUCCESS; + SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao + void* pStVal = NULL; + int32_t len = 0; + code = streamStateGet_rocksdb(pFileState->pFileStore, &stkey, &pStVal, &len); + if (code == TSDB_CODE_SUCCESS) { + streamFileStateDecode(pFileState, pStVal, len); + } else { + return TSDB_CODE_FAILED; + } + + SWinKey key = {.groupId = 0, .ts = 0}; + SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key); + if (!pCur) { + return TSDB_CODE_FAILED; + } + code = streamStateSeekLast(pFileState->pFileStore, pCur); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + while (code == TSDB_CODE_SUCCESS) { + if (pFileState->curRowCount == pFileState->maxRowCount) { + break; + } + void* pVal = NULL; + int32_t pVLen = 0; + SRowBuffPos* pNewPos = getNewRowPos(pFileState); + code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void **)&pVal, &pVLen); + if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { + destroyRowBuffPos(pNewPos); + break; + } + memcpy(pNewPos->pRowBuff, pVal, pVLen); + taosMemoryFree(pVal); + code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->rowSize, &pNewPos, POINTER_BYTES); + if (code != TSDB_CODE_SUCCESS) { + destroyRowBuffPos(pNewPos); + break; + } + code = streamStateCurPrev_rocksdb(pFileState->pFileStore, pCur); + } + return TSDB_CODE_SUCCESS; } \ No newline at end of file