From 426a794e7e0592accdf9f7cd46bb385a921c7a63 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 21 Apr 2023 17:47:00 +0800 Subject: [PATCH] add ini --- source/libs/stream/src/tstreamFileState.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 7f5b7dcb18..5c059765f1 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -21,7 +21,7 @@ #include "thash.h" #include "tsimplehash.h" -#define FLUSH_RATIO 0.2 +#define FLUSH_RATIO 0.5 #define FLUSH_NUM 4 #define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024); @@ -75,7 +75,8 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->curRowCount = 0; pFileState->deleteMark = delMark; - pFileState->flushMark = -1; + pFileState->flushMark = INT64_MIN; + pFileState->maxTs = INT64_MIN; recoverSnapshot(pFileState); return pFileState; @@ -147,6 +148,8 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { } void streamFileStateClear(SStreamFileState* pFileState) { + pFileState->flushMark = INT64_MIN; + pFileState->maxTs = INT64_MIN; tSimpleHashClear(pFileState->rowBuffMap); clearExpiredRowBuff(pFileState, 0, true); } @@ -262,6 +265,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi int32_t len = 0; void* pVal = NULL; int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &pVal, &len); + qDebug("===stream===get %" PRId64 " from disc, res %d", ts, code); if (code == TSDB_CODE_SUCCESS) { memcpy(pNewPos->pRowBuff, pVal, len); } @@ -349,6 +353,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize); + qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); } if (streamStateGetBatchSize(batch) > 0) { code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); -- GitLab