You need to sign in or sign up before continuing.
提交 81bd7661 编写于 作者: dengyihao's avatar dengyihao

add ttl to stream state key

上级 2d521824
...@@ -41,6 +41,7 @@ typedef struct STdbState { ...@@ -41,6 +41,7 @@ typedef struct STdbState {
void* param; void* param;
void* env; void* env;
void* compactFactory; void* compactFactory;
rocksdb_cache_t* cache;
TDB* db; TDB* db;
TTB* pStateDb; TTB* pStateDb;
......
...@@ -39,6 +39,11 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, ...@@ -39,6 +39,11 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf,
return ret; return ret;
} }
} }
int streamStateValueIsStale(char* vv) {
int64_t ts = 0;
taosDecodeFixedI64(vv, &ts);
return ts < taosGetTimestampSec() ? 1 : 0;
}
int defaultKeyEncode(void* k, char* buf) { int defaultKeyEncode(void* k, char* buf) {
int len = strlen((char*)k); int len = strlen((char*)k);
memcpy(buf, (char*)k, len); memcpy(buf, (char*)k, len);
...@@ -333,7 +338,6 @@ int32_t streaValueIsStale(void* k, int64_t ts) { ...@@ -333,7 +338,6 @@ int32_t streaValueIsStale(void* k, int64_t ts) {
typedef struct { typedef struct {
void* tableOpt; void* tableOpt;
void* lru; // global or not
} rocksdbCfParam; } rocksdbCfParam;
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
...@@ -486,6 +490,7 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -486,6 +490,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_compactionfilterfactory_t* factory = rocksdb_compactionfilterfactory_create( rocksdb_compactionfilterfactory_t* factory = rocksdb_compactionfilterfactory_create(
NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
rocksdb_options_set_compaction_filter_factory(opts, factory); rocksdb_options_set_compaction_filter_factory(opts, factory);
rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20);
char* err = NULL; char* err = NULL;
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
...@@ -496,7 +501,7 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -496,7 +501,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
cfOpt[i] = rocksdb_options_create_copy(opts); cfOpt[i] = rocksdb_options_create_copy(opts);
// refactor later // refactor later
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
rocksdb_cache_t* cache = rocksdb_cache_create_lru(32 << 20); // rocksdb_cache_t* cache = rocksdb_cache_create_lru(32 << 20);
rocksdb_block_based_options_set_block_cache(tableOpt, cache); rocksdb_block_based_options_set_block_cache(tableOpt, cache);
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
...@@ -505,7 +510,6 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -505,7 +510,6 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt);
param[i].tableOpt = tableOpt; param[i].tableOpt = tableOpt;
param[i].lru = cache;
}; };
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**));
...@@ -531,6 +535,7 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -531,6 +535,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
pState->pTdbState->param = param; pState->pTdbState->param = param;
pState->pTdbState->env = env; pState->pTdbState->env = env;
pState->pTdbState->compactFactory = factory; pState->pTdbState->compactFactory = factory;
pState->pTdbState->cache = cache;
return 0; return 0;
} }
void streamCleanBackend(SStreamState* pState) { void streamCleanBackend(SStreamState* pState) {
...@@ -560,10 +565,10 @@ void streamCleanBackend(SStreamState* pState) { ...@@ -560,10 +565,10 @@ void streamCleanBackend(SStreamState* pState) {
rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); rocksdb_options_destroy(pState->pTdbState->cfOpts[i]);
rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]);
rocksdb_cache_destroy(param[i].lru);
rocksdb_block_based_options_destroy(param[i].tableOpt); rocksdb_block_based_options_destroy(param[i].tableOpt);
// rocksdb_compactionfilterfactory_destroy(param[i].filteFactory); // rocksdb_compactionfilterfactory_destroy(param[i].filteFactory);
} }
rocksdb_cache_destroy(pState->pTdbState->cache);
taosMemoryFreeClear(pState->pTdbState->cfOpts); taosMemoryFreeClear(pState->pTdbState->cfOpts);
taosMemoryFree(pState->pTdbState->pCompare); taosMemoryFree(pState->pTdbState->pCompare);
taosMemoryFree(pState->pTdbState->param); taosMemoryFree(pState->pTdbState->param);
...@@ -907,6 +912,12 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin ...@@ -907,6 +912,12 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
// skip ttl expired data
while (rocksdb_iter_valid(pCur->iter)) {
char* val = (char*)rocksdb_iter_value(pCur->iter, NULL);
if (!streamStateValueIsStale(val)) break;
rocksdb_iter_next(pCur->iter);
}
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter)) {
SStateKey curKey; SStateKey curKey;
...@@ -936,8 +947,18 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK ...@@ -936,8 +947,18 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
pCur->db = pState->pTdbState->rocksdb; pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter)) {
char* val = (char*)rocksdb_iter_value(pCur->iter, NULL);
if (!streamStateValueIsStale(val)) break;
rocksdb_iter_prev(pCur->iter);
}
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
pCur = NULL;
}
STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
return pCur; return pCur;
} }
...@@ -955,15 +976,19 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* ...@@ -955,15 +976,19 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
int len = stateKeyEncode((void*)&sKey, buf); int len = stateKeyEncode((void*)&sKey, buf);
rocksdb_iter_seek(pCur->iter, buf, len); rocksdb_iter_seek(pCur->iter, buf, len);
if (rocksdb_iter_valid(pCur->iter)) {
SStateKey curKey;
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
stateKeyDecode((void*)&curKey, keyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { if (rocksdb_iter_valid(pCur->iter)) {
pCur->number = pState->number; char* val = (char*)rocksdb_iter_value(pCur->iter, NULL);
return pCur; if (!streamStateValueIsStale(val)) {
SStateKey curKey;
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
stateKeyDecode((void*)&curKey, keyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
pCur->number = pState->number;
return pCur;
}
} }
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
...@@ -1043,6 +1068,16 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta ...@@ -1043,6 +1068,16 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
while (rocksdb_iter_valid(pCur->iter)) {
char* val = (char*)rocksdb_iter_value(pCur->iter, NULL);
if (!streamStateValueIsStale(val)) {
break;
}
rocksdb_iter_prev(pCur->iter);
}
if (!rocksdb_iter_valid(pCur->iter)) {
return NULL;
}
int32_t c = 0; int32_t c = 0;
size_t klen; size_t klen;
...@@ -1078,6 +1113,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta ...@@ -1078,6 +1113,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
const char* iVal = rocksdb_iter_value(pCur->iter, NULL);
if (streamStateValueIsStale((char*)iVal)) {
streamStateFreeCur(pCur);
return NULL;
}
size_t klen; size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0}; SStateSessionKey curKey = {0};
...@@ -1191,6 +1231,13 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK ...@@ -1191,6 +1231,13 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
if (rocksdb_iter_valid(pCur->iter)) {
const char* iVal = rocksdb_iter_value(pCur->iter, NULL);
if (streamStateValueIsStale((char*)iVal)) {
streamStateFreeCur(pCur);
return NULL;
}
}
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter)) {
size_t kLen; size_t kLen;
...@@ -1218,11 +1265,13 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, ...@@ -1218,11 +1265,13 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
size_t vlen = 0; size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
if (pVal != NULL) { char* dst = NULL;
char* dst = taosMemoryCalloc(1, vlen); int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst);
memcpy(dst, valStr, vlen); if (len < 0) {
*pVal = dst; return -1;
} }
if (pVal != NULL) *pVal = (char*)dst;
if (pVLen != NULL) *pVLen = vlen; if (pVLen != NULL) *pVLen = vlen;
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册