From c2de8c91ddd01f3cf9608908348a5911b3b0f7a9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 4 May 2023 11:52:29 +0000 Subject: [PATCH] fix mem leak --- source/libs/stream/src/streamBackendRocksdb.c | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8339d10579..e8d51c4307 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -29,11 +29,6 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c char** newval, size_t* newvlen, unsigned char* value_changed); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx); -void* streamBackendInit(const char* path); -void streamBackendCleanup(void* arg); -SListNode* streamBackendAddCompare(void* backend, void* arg); -void streamBackendDelCompare(void* backend, void* arg); - typedef struct { void* tableOpt; } RocksdbCfParam; @@ -103,7 +98,16 @@ _EXIT: return NULL; } void streamBackendCleanup(void* arg) { - SBackendHandle* pHandle = (SBackendHandle*)arg; + SBackendHandle* pHandle = (SBackendHandle*)arg; + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + char* err = NULL; + rocksdb_flush(pHandle->db, flushOpt, &err); + if (err != NULL) { + qError("failed to flush db before streamBackend clean up, reason:%s", err); + taosMemoryFree(err); + } + rocksdb_flushoptions_destroy(flushOpt); + rocksdb_close(pHandle->db); rocksdb_options_destroy(pHandle->dbOpt); rocksdb_env_destroy(pHandle->env); @@ -168,7 +172,7 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, int streamStateValueIsStale(char* vv) { int64_t ts = 0; taosDecodeFixedI64(vv, &ts); - return (ts != 0 && ts < taosGetTimestampSec()) ? 1 : 0; + return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0; } int iterValueIsStale(rocksdb_iterator_t* iter) { size_t len; @@ -508,15 +512,19 @@ int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { SStreamValue key = {0}; char* p = value; - int64_t now = taosGetTimestampMs(); - p = taosDecodeFixedI64(p, &key.unixTimestamp); - p = taosDecodeFixedI32(p, &key.len); - p = taosDecodeBinary(p, (void**)&(key.data), key.len); - if (key.unixTimestamp != 0 && key.unixTimestamp < now) { - taosMemoryFree(key.data); + if (streamStateValueIsStale(p)) { *dest = NULL; return -1; } + int64_t now = taosGetTimestampMs(); + p = taosDecodeFixedI64(p, &key.unixTimestamp); + p = taosDecodeFixedI32(p, &key.len); + if (key.len == 0) { + key.data = NULL; + } else { + p = taosDecodeBinary(p, (void**)&(key.data), key.len); + } + if (ttl != NULL) { *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now; } @@ -1164,6 +1172,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + return NULL; } while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter); @@ -1544,7 +1553,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe memset(tmp, 0, valSize); _end: - + taosMemoryFree(*pVal); *pVal = tmp; streamStateFreeCur(pCur); return res; -- GitLab