提交 c64a7d3e 编写于 作者: dengyihao's avatar dengyihao

add ttl to stream state key

上级 05df0f6e
...@@ -236,7 +236,7 @@ if(${BUILD_WITH_ROCKSDB}) ...@@ -236,7 +236,7 @@ if(${BUILD_WITH_ROCKSDB})
endif(${TD_WINDOWS}) endif(${TD_WINDOWS})
if(${TD_DARWIN} OR ${TD_WINDOWS}) if(${TD_DARWIN})
option(HAVE_THREAD_LOCAL "" OFF) option(HAVE_THREAD_LOCAL "" OFF)
option(WITH_IOSTATS_CONTEXT "" OFF) option(WITH_IOSTATS_CONTEXT "" OFF)
option(WITH_PERF_CONTEXT "" OFF) option(WITH_PERF_CONTEXT "" OFF)
......
...@@ -362,7 +362,10 @@ const char* compareFuncKeyName(void* name); ...@@ -362,7 +362,10 @@ const char* compareFuncKeyName(void* name);
const char* compareParKeyName(void* name); const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name); const char* comparePartagKeyName(void* name);
void destroyFunc(void* stata) { return; } void destroyFunc(void* arg) {
(void)arg;
return;
}
typedef struct { typedef struct {
const char* key; const char* key;
...@@ -434,13 +437,34 @@ SCfInit ginitDict[] = { ...@@ -434,13 +437,34 @@ SCfInit ginitDict[] = {
encodeValueFunc, decodeValueFunc}, encodeValueFunc, decodeValueFunc},
}; };
const char* compareDefaultName(void* name) { return ginitDict[0].key; } const char* compareDefaultName(void* arg) {
const char* compareStateName(void* name) { return ginitDict[1].key; } (void)arg;
const char* compareWinKeyName(void* name) { return ginitDict[2].key; } return ginitDict[0].key;
const char* compareSessionKeyName(void* name) { return ginitDict[3].key; } }
const char* compareFuncKeyName(void* name) { return ginitDict[4].key; } const char* compareStateName(void* arg) {
const char* compareParKeyName(void* name) { return ginitDict[5].key; } (void)arg;
const char* comparePartagKeyName(void* name) { return ginitDict[6].key; } return ginitDict[1].key;
}
const char* compareWinKeyName(void* arg) {
(void)arg;
return ginitDict[2].key;
}
const char* compareSessionKeyName(void* arg) {
(void)arg;
return ginitDict[3].key;
}
const char* compareFuncKeyName(void* arg) {
(void)arg;
return ginitDict[4].key;
}
const char* compareParKeyName(void* arg) {
(void)arg;
return ginitDict[5].key;
}
const char* comparePartagKeyName(void* arg) {
(void)arg;
return ginitDict[6].key;
}
typedef struct SCompactFilteFactory { typedef struct SCompactFilteFactory {
void* status; void* status;
...@@ -454,9 +478,7 @@ const char* compactFilteFactoryName(void* arg) { ...@@ -454,9 +478,7 @@ const char* compactFilteFactoryName(void* arg) {
return "stream_compact_filter"; return "stream_compact_filter";
} }
void destroyCompactFilte(void* arg) { void destroyCompactFilte(void* arg) { (void)arg; }
if (arg == NULL) return;
}
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) { char** newval, size_t* newvlen, unsigned char* value_changed) {
int64_t unixTime = taosGetTimestampMs(); int64_t unixTime = taosGetTimestampMs();
...@@ -485,8 +507,8 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -485,8 +507,8 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_options_t* opts = rocksdb_options_create(); rocksdb_options_t* opts = rocksdb_options_create();
rocksdb_options_set_env(opts, env); rocksdb_options_set_env(opts, env);
// rocksdb_options_increase_parallelism(opts, 8); // rocksdb_options_increase_parallelism(opts, 8);
// rocksdb_options_optimize_level_style_compaction(opts, 0); // rocksdb_options_optimize_level_style_compaction(opts, 0);
// create the DB if it's not already present // create the DB if it's not already present
rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1);
rocksdb_options_set_write_buffer_size(opts, 128 << 20); rocksdb_options_set_write_buffer_size(opts, 128 << 20);
...@@ -494,6 +516,7 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -494,6 +516,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_compactionfilterfactory_t* factory = rocksdb_compactionfilterfactory_create( rocksdb_compactionfilterfactory_t* factory = rocksdb_compactionfilterfactory_create(
NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
rocksdb_options_set_compaction_filter_factory(opts, factory); rocksdb_options_set_compaction_filter_factory(opts, factory);
rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20);
char* err = NULL; char* err = NULL;
...@@ -570,15 +593,12 @@ void streamCleanBackend(SStreamState* pState) { ...@@ -570,15 +593,12 @@ void streamCleanBackend(SStreamState* pState) {
rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]);
rocksdb_block_based_options_destroy(param[i].tableOpt); rocksdb_block_based_options_destroy(param[i].tableOpt);
// rocksdb_compactionfilterfactory_destroy(param[i].filteFactory);
} }
rocksdb_cache_destroy(pState->pTdbState->cache); rocksdb_cache_destroy(pState->pTdbState->cache);
taosMemoryFreeClear(pState->pTdbState->cfOpts); taosMemoryFreeClear(pState->pTdbState->cfOpts);
taosMemoryFree(pState->pTdbState->pCompare); taosMemoryFree(pState->pTdbState->pCompare);
taosMemoryFree(pState->pTdbState->param); taosMemoryFree(pState->pTdbState->param);
rocksdb_env_destroy(pState->pTdbState->env); rocksdb_env_destroy(pState->pTdbState->env);
rocksdb_compactionfilterfactory_destroy(pState->pTdbState->compactFactory);
pState->pTdbState->rocksdb = NULL; pState->pTdbState->rocksdb = NULL;
} }
...@@ -612,12 +632,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -612,12 +632,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
if (snapshot != NULL) { if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
} }
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
*readOpt = rOpt; *readOpt = rOpt;
// rocksdb_readoptions_set_snapshot(rOpt, *snapshot); rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(rOpt, 0); rocksdb_readoptions_set_fill_cache(rOpt, 0);
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]); return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
...@@ -690,6 +708,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -690,6 +708,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
} else { \ } else { \
taosMemoryFree(p); \ taosMemoryFree(p); \
} \ } \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = len; \ if (vLen != NULL) *vLen = len; \
} \ } \
if (err != NULL) { \ if (err != NULL) { \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册