diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index d7c9940b745afc4edc3308f7354d9afe7ab38109..2d36a54288e48dab97805b3b4ed6ec66ec51f127 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -41,6 +41,7 @@ typedef struct STdbState { void* param; void* env; void* compactFactory; + rocksdb_cache_t* cache; TDB* db; TTB* pStateDb; diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index b6838651f0f709cb1c5e5c60212e59e2c8d1bad4..dee215a336964e268c3bfab6f99c43d20eb73178 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -39,6 +39,11 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, return ret; } } +int streamStateValueIsStale(char* vv) { + int64_t ts = 0; + taosDecodeFixedI64(vv, &ts); + return ts < taosGetTimestampSec() ? 1 : 0; +} int defaultKeyEncode(void* k, char* buf) { int len = strlen((char*)k); memcpy(buf, (char*)k, len); @@ -333,7 +338,6 @@ int32_t streaValueIsStale(void* k, int64_t ts) { typedef struct { void* tableOpt; - void* lru; // global or not } rocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; @@ -486,6 +490,7 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_compactionfilterfactory_t* factory = rocksdb_compactionfilterfactory_create( NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); rocksdb_options_set_compaction_filter_factory(opts, factory); + rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -496,7 +501,7 @@ int streamInitBackend(SStreamState* pState, char* path) { cfOpt[i] = rocksdb_options_create_copy(opts); // refactor later rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); - rocksdb_cache_t* cache = rocksdb_cache_create_lru(32 << 20); + // rocksdb_cache_t* cache = rocksdb_cache_create_lru(32 << 20); rocksdb_block_based_options_set_block_cache(tableOpt, cache); rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); @@ -505,7 +510,6 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); param[i].tableOpt = tableOpt; - param[i].lru = cache; }; rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); @@ -531,6 +535,7 @@ int streamInitBackend(SStreamState* pState, char* path) { pState->pTdbState->param = param; pState->pTdbState->env = env; pState->pTdbState->compactFactory = factory; + pState->pTdbState->cache = cache; return 0; } void streamCleanBackend(SStreamState* pState) { @@ -560,10 +565,10 @@ void streamCleanBackend(SStreamState* pState) { rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); - rocksdb_cache_destroy(param[i].lru); rocksdb_block_based_options_destroy(param[i].tableOpt); // rocksdb_compactionfilterfactory_destroy(param[i].filteFactory); } + rocksdb_cache_destroy(pState->pTdbState->cache); taosMemoryFreeClear(pState->pTdbState->cfOpts); taosMemoryFree(pState->pTdbState->pCompare); taosMemoryFree(pState->pTdbState->param); @@ -907,6 +912,12 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin streamStateFreeCur(pCur); return NULL; } + // skip ttl expired data + while (rocksdb_iter_valid(pCur->iter)) { + char* val = (char*)rocksdb_iter_value(pCur->iter, NULL); + if (!streamStateValueIsStale(val)) break; + rocksdb_iter_next(pCur->iter); + } if (rocksdb_iter_valid(pCur->iter)) { SStateKey curKey; @@ -936,8 +947,18 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); + rocksdb_iter_prev(pCur->iter); + while (rocksdb_iter_valid(pCur->iter)) { + char* val = (char*)rocksdb_iter_value(pCur->iter, NULL); + if (!streamStateValueIsStale(val)) break; + rocksdb_iter_prev(pCur->iter); + } + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + pCur = NULL; + } STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); return pCur; } @@ -955,15 +976,19 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* int len = stateKeyEncode((void*)&sKey, buf); rocksdb_iter_seek(pCur->iter, buf, len); - if (rocksdb_iter_valid(pCur->iter)) { - SStateKey curKey; - size_t kLen = 0; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - stateKeyDecode((void*)&curKey, keyStr); - if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { - pCur->number = pState->number; - return pCur; + if (rocksdb_iter_valid(pCur->iter)) { + char* val = (char*)rocksdb_iter_value(pCur->iter, NULL); + if (!streamStateValueIsStale(val)) { + SStateKey curKey; + size_t kLen = 0; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + stateKeyDecode((void*)&curKey, keyStr); + + if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { + pCur->number = pState->number; + return pCur; + } } } streamStateFreeCur(pCur); @@ -1043,6 +1068,16 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta streamStateFreeCur(pCur); return NULL; } + while (rocksdb_iter_valid(pCur->iter)) { + char* val = (char*)rocksdb_iter_value(pCur->iter, NULL); + if (!streamStateValueIsStale(val)) { + break; + } + rocksdb_iter_prev(pCur->iter); + } + if (!rocksdb_iter_valid(pCur->iter)) { + return NULL; + } int32_t c = 0; size_t klen; @@ -1078,6 +1113,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta streamStateFreeCur(pCur); return NULL; } + const char* iVal = rocksdb_iter_value(pCur->iter, NULL); + if (streamStateValueIsStale((char*)iVal)) { + streamStateFreeCur(pCur); + return NULL; + } size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; @@ -1191,6 +1231,13 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK streamStateFreeCur(pCur); return NULL; } + if (rocksdb_iter_valid(pCur->iter)) { + const char* iVal = rocksdb_iter_value(pCur->iter, NULL); + if (streamStateValueIsStale((char*)iVal)) { + streamStateFreeCur(pCur); + return NULL; + } + } if (rocksdb_iter_valid(pCur->iter)) { size_t kLen; @@ -1218,11 +1265,13 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, size_t vlen = 0; const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - if (pVal != NULL) { - char* dst = taosMemoryCalloc(1, vlen); - memcpy(dst, valStr, vlen); - *pVal = dst; + char* dst = NULL; + int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst); + if (len < 0) { + return -1; } + + if (pVal != NULL) *pVal = (char*)dst; if (pVLen != NULL) *pVLen = vlen; } else {