From 1f63859e218976248b4a2611cc58ae85c98d4ebb Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 7 Apr 2023 12:50:59 +0800 Subject: [PATCH] feat:add buff swap --- include/libs/stream/tstreamFileState.h | 3 +- source/common/src/tglobal.c | 4 +- source/libs/executor/src/timewindowoperator.c | 4 +- source/libs/stream/src/streamState.c | 4 +- source/libs/stream/src/tstreamFileState.c | 97 +++++++++++++------ tests/script/tsim/stream/basic1.sim | 2 + tests/script/tsim/stream/basic4.sim | 75 ++++++++++++++ 7 files changed, 154 insertions(+), 35 deletions(-) create mode 100644 tests/script/tsim/stream/basic4.sim diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index fd9849d194..1962e41b6c 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -37,13 +37,14 @@ typedef SList SStreamSnapshot; typedef TSKEY (*GetTsFun)(void*); -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark); +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); +void releaseRowBuffPos(SRowBuffPos* pBuff); bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3a488d3399..f69a1721b2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -828,7 +828,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } - tsDisableStream = cfgGetItem(pCfg, "disableStream")->i64; + tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; + tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; + tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64; GRANT_CFG_GET; return 0; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5daf02b212..8b5750dbcf 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2881,7 +2881,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs, + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); pOperator->operatorType = pPhyNode->type; @@ -5042,7 +5042,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs, + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index a73c2587a7..b11ef1b640 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -319,7 +319,9 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) { } int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) { - return getRowBuffByPos(pState->pFileState, pos, pVal); + int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal); + releaseRowBuffPos(pos); + return code; } // todo refactor diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 381e0c55f7..b2621a32c5 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -22,6 +22,7 @@ #define FLUSH_RATIO 0.2 +#define FLUSH_NUM 4 #define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024); struct SStreamFileState { @@ -43,7 +44,7 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark) { +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } @@ -62,12 +63,13 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFu if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) { goto _error; } + pFileState->keyLen = keySize; pFileState->rowSize = rowSize; pFileState->preCheckPointVersion = 0; pFileState->checkPointVersion = 1; pFileState->pFileStore = pFile; pFileState->getTs = fp; - pFileState->maxRowCount = memSize / rowSize; + pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->curRowCount = 0; pFileState->deleteMark = delMark; pFileState->flushMark = -1; @@ -90,7 +92,9 @@ void destroyRowBuffPosPtr(void* ptr) { return; } SRowBuffPos* pPos = *(SRowBuffPos**)ptr; - destroyRowBuffPos(pPos); + if (!pPos->beUsed) { + destroyRowBuffPos(pPos); + } } void destroyRowBuff(void* ptr) { @@ -117,13 +121,14 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL) { SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data); - if (all || (pFileState->getTs(pPos->pKey) usedBuffs, pNode); - taosMemoryFreeClear(pNode); + if (all || (pFileState->getTs(pPos->pKey) < ts) ) { + ASSERT(pPos->pRowBuff != NULL); tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); pPos->pRowBuff = NULL; tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); destroyRowBuffPos(pPos); + tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(pNode); } } } @@ -133,27 +138,47 @@ void streamFileStateClear(SStreamFileState* pFileState) { clearExpiredRowBuff(pFileState, 0, true); } -int32_t flushRowBuff(SStreamFileState* pFileState) { - SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES); - if (!pFlushList) { - return TSDB_CODE_OUT_OF_MEMORY; - } - uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO); +void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { uint64_t i = 0; SListIter iter = {0}; tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); SListNode* pNode = NULL; - while ((pNode = tdListNext(&iter)) != NULL && i < num) { + while ((pNode = tdListNext(&iter)) != NULL && i < max) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; - if (!pPos->beUsed) { + if (pPos->beUsed == used) { tdListAppend(pFlushList, &pPos); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); + tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(pNode); i++; } } +} + +int32_t flushRowBuff(SStreamFileState* pFileState) { + SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES); + if (!pFlushList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO); + num = TMAX(num, FLUSH_NUM); + popUsedBuffs(pFileState, pFlushList, num, false); + if (isListEmpty(pFlushList)) { + popUsedBuffs(pFileState, pFlushList, num, true); + } flushSnapshot(pFileState, pFlushList, false); + SListIter fIter = {0}; + tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD); + SListNode* pNode = NULL; + while ((pNode = tdListNext(&fIter)) != NULL) { + SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; + ASSERT(pPos->pRowBuff != NULL); + tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff); + pPos->pRowBuff = NULL; + } + tdListFreeP(pFlushList, destroyRowBuffPosPtr); return TSDB_CODE_SUCCESS; } @@ -178,11 +203,11 @@ void* getFreeBuff(SList* lists, int32_t buffSize) { SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); - tdListAppend(pFileState->usedBuffs, &pPos); + pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen); void* pBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); if (pBuff) { pPos->pRowBuff = pBuff; - return pPos; + goto _end; } if (pFileState->curRowCount < pFileState->maxRowCount) { @@ -190,13 +215,17 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { if (pBuff) { pPos->pRowBuff = pBuff; pFileState->curRowCount++; - return pPos; + goto _end; } } int32_t code = clearRowBuff(pFileState); ASSERT(code == 0); pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); + +_end: + tdListAppend(pFileState->usedBuffs, &pPos); + ASSERT(pPos->pRowBuff != NULL); return pPos; } @@ -204,23 +233,24 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey)); SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen); if (pos) { - if (pVal) { - *pVLen = pFileState->rowSize; - *pVal = *pos; - } + *pVLen = pFileState->rowSize; + *pVal = *pos; + (*pos)->beUsed = true; return TSDB_CODE_SUCCESS; } SRowBuffPos* pNewPos = getNewRowPos(pFileState); - ASSERT(pNewPos);// todo(liuyao) delete - pNewPos->pKey = taosMemoryCalloc(1, keyLen); + pNewPos->beUsed = true; + ASSERT(pNewPos->pRowBuff); memcpy(pNewPos->pKey, pKey, keyLen); TSKEY ts = pFileState->getTs(pKey); if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) { int32_t len = 0; void *pVal = NULL; - streamStateGet_rocksdb(pFileState->pFileStore, pKey, pVal, &len); - memcpy(pNewPos->pRowBuff, pVal, len); + int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &pVal, &len); + if (code == TSDB_CODE_SUCCESS) { + memcpy(pNewPos->pRowBuff, pVal, len); + } taosMemoryFree(pVal); } @@ -244,15 +274,21 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** return TSDB_CODE_SUCCESS; } - int32_t code = clearRowBuff(pFileState); - ASSERT(code == 0); pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); + if (!pPos->pRowBuff) { + int32_t code = clearRowBuff(pFileState); + ASSERT(code == 0); + pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); + ASSERT(pPos->pRowBuff); + } int32_t len = 0; - streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, pVal, &len); - memcpy(pPos->pRowBuff, pVal, len); - taosMemoryFree(pVal); + void *pBuff = NULL; + streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pBuff, &len); + memcpy(pPos->pRowBuff, pBuff, len); + taosMemoryFree(pBuff); (*pVal) = pPos->pRowBuff; + tdListPrepend(pFileState->usedBuffs, &pPos); return TSDB_CODE_SUCCESS; } @@ -292,6 +328,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; + ASSERT(pPos->pRowBuff && pFileState->rowSize > 0); code = streamStatePut_rocksdb(pFileState->pFileStore, pPos->pKey, pPos->pRowBuff, pFileState->rowSize); } if (flushState) { diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index 86e8ff1f26..f8791c4963 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -845,6 +845,8 @@ sql create stream streams7 trigger at_once IGNORE EXPIRED 0 into streamt7 as sel sql insert into ts1 values(1648791211000,1,2,3); sql_error insert into ts1 values(-1648791211000,1,2,3); +$loop_count = 0 + loop18: sleep 200 diff --git a/tests/script/tsim/stream/basic4.sim b/tests/script/tsim/stream/basic4.sim new file mode 100644 index 0000000000..b7e70636d6 --- /dev/null +++ b/tests/script/tsim/stream/basic4.sim @@ -0,0 +1,75 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c debugflag -v 131 +system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10 +system sh/exec.sh -n dnode1 -s start + +sleep 10000 + +sql connect + +sql create database test vgroups 1; +sql use test; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once into streamt as select _wstart, count(*) c1 from t1 interval(1s); + +sql insert into t1 values(1648791211000,1,2,3,1.0); +sql insert into t1 values(1648791212001,2,2,3,1.1); +sql insert into t1 values(1648791213002,3,2,3,2.1); +sql insert into t1 values(1648791214003,4,2,3,3.1); +sql insert into t1 values(1648791215003,4,2,3,3.1); +sql insert into t1 values(1648791216004,4,2,3,4.1); +sql insert into t1 values(1648791217004,4,2,3,4.1); +sql insert into t1 values(1648791218004,4,2,3,4.1); + +sql insert into t1 values(1648791221004,4,2,3,4.1); +sql insert into t1 values(1648791222004,4,2,3,4.1); +sql insert into t1 values(1648791223004,4,2,3,4.1); +sql insert into t1 values(1648791224004,4,2,3,4.1); +sql insert into t1 values(1648791225005,4,2,3,4.1); +sql insert into t1 values(1648791226005,4,2,3,4.1); +sql insert into t1 values(1648791227005,4,2,3,4.1); +sql insert into t1 values(1648791228005,4,2,3,4.1); + +$loop_count = 0 + +loop0: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt +sql select * from streamt; + +if $rows != 16 then + print =====rows=$rows + goto loop0 +endi + +sql insert into t1 values(1648791231004,4,2,3,4.1) (1648791232004,4,2,3,4.1) (1648791233004,4,2,3,4.1) (1648791234004,4,2,3,4.1) (1648791235004,4,2,3,4.1) (1648791236004,4,2,3,4.1) (1648791237004,4,2,3,4.1) (1648791238004,4,2,3,4.1) (1648791239004,4,2,3,4.1) (1648791240004,4,2,3,4.1) (1648791241004,4,2,3,4.1) (1648791242004,4,2,3,4.1) (1648791243004,4,2,3,4.1); + +$loop_count = 0 + +loop0: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 select * from streamt +sql select * from streamt; + +if $rows != 29 then + print =====rows=$rows + goto loop0 +endi + + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file -- GitLab