提交 c2de8c91 编写于 作者: dengyihao's avatar dengyihao

fix mem leak

上级 f9c49f49
...@@ -29,11 +29,6 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c ...@@ -29,11 +29,6 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c
char** newval, size_t* newvlen, unsigned char* value_changed); char** newval, size_t* newvlen, unsigned char* value_changed);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
void* streamBackendInit(const char* path);
void streamBackendCleanup(void* arg);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
typedef struct { typedef struct {
void* tableOpt; void* tableOpt;
} RocksdbCfParam; } RocksdbCfParam;
...@@ -103,7 +98,16 @@ _EXIT: ...@@ -103,7 +98,16 @@ _EXIT:
return NULL; return NULL;
} }
void streamBackendCleanup(void* arg) { void streamBackendCleanup(void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)arg; SBackendHandle* pHandle = (SBackendHandle*)arg;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
char* err = NULL;
rocksdb_flush(pHandle->db, flushOpt, &err);
if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err);
}
rocksdb_flushoptions_destroy(flushOpt);
rocksdb_close(pHandle->db); rocksdb_close(pHandle->db);
rocksdb_options_destroy(pHandle->dbOpt); rocksdb_options_destroy(pHandle->dbOpt);
rocksdb_env_destroy(pHandle->env); rocksdb_env_destroy(pHandle->env);
...@@ -168,7 +172,7 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, ...@@ -168,7 +172,7 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf,
int streamStateValueIsStale(char* vv) { int streamStateValueIsStale(char* vv) {
int64_t ts = 0; int64_t ts = 0;
taosDecodeFixedI64(vv, &ts); taosDecodeFixedI64(vv, &ts);
return (ts != 0 && ts < taosGetTimestampSec()) ? 1 : 0; return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
} }
int iterValueIsStale(rocksdb_iterator_t* iter) { int iterValueIsStale(rocksdb_iterator_t* iter) {
size_t len; size_t len;
...@@ -508,15 +512,19 @@ int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { ...@@ -508,15 +512,19 @@ int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) {
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
SStreamValue key = {0}; SStreamValue key = {0};
char* p = value; char* p = value;
int64_t now = taosGetTimestampMs(); if (streamStateValueIsStale(p)) {
p = taosDecodeFixedI64(p, &key.unixTimestamp);
p = taosDecodeFixedI32(p, &key.len);
p = taosDecodeBinary(p, (void**)&(key.data), key.len);
if (key.unixTimestamp != 0 && key.unixTimestamp < now) {
taosMemoryFree(key.data);
*dest = NULL; *dest = NULL;
return -1; return -1;
} }
int64_t now = taosGetTimestampMs();
p = taosDecodeFixedI64(p, &key.unixTimestamp);
p = taosDecodeFixedI32(p, &key.len);
if (key.len == 0) {
key.data = NULL;
} else {
p = taosDecodeBinary(p, (void**)&(key.data), key.len);
}
if (ttl != NULL) { if (ttl != NULL) {
*ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now; *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now;
} }
...@@ -1164,6 +1172,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta ...@@ -1164,6 +1172,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf); int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
return NULL;
} }
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter); while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
...@@ -1544,7 +1553,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe ...@@ -1544,7 +1553,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
memset(tmp, 0, valSize); memset(tmp, 0, valSize);
_end: _end:
taosMemoryFree(*pVal);
*pVal = tmp; *pVal = tmp;
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return res; return res;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册