From d7e86aca05f7602fd6a8912a7602ce0b29b74c5d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 5 May 2023 15:14:16 +0000 Subject: [PATCH] fix case failure --- include/libs/stream/streamState.h | 2 +- source/libs/stream/inc/streamBackendRocksdb.h | 2 + source/libs/stream/src/streamBackendRocksdb.c | 159 +++++++++++++++++- 3 files changed, 154 insertions(+), 9 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index b2ca8cfcf2..63e9e3799a 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -45,7 +45,7 @@ typedef struct STdbState { void* env; SListNode* pComparNode; void* pBackendHandle; - char idstr[48]; + char idstr[64]; void* compactFactory; TDB* db; diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 5712d68561..eac7e10a4e 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -42,6 +42,8 @@ typedef struct { TdThreadMutex mutex; rocksdb_compactionfilterfactory_t* filterFactory; SList* list; + TdThreadMutex cfMutex; + SHashObj* cfInst; } SBackendHandle; void* streamBackendInit(const char* path); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index fea9f4daa3..9599c0e214 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -21,6 +21,20 @@ typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; +typedef struct { + rocksdb_t* db; + rocksdb_column_family_handle_t** pHandle; + rocksdb_writeoptions_t* wOpt; + rocksdb_readoptions_t* rOpt; + rocksdb_options_t** cfOpt; + rocksdb_options_t* dbOpt; + void* param; + void* pBackendHandle; + SListNode* pCompareNode; +} RocksdbCfInst; + +RocksdbCfInst* streamStateOpenBackendCf(void* backend, char* name, char* idstr); + void destroyCompactFilteFactory(void* arg); void destroyCompactFilte(void* arg); const char* compactFilteFactoryName(void* arg); @@ -52,9 +66,12 @@ const char* compareParKeyName(void* name); const char* comparePartagKeyName(void* name); void* streamBackendInit(const char* path) { + qDebug("init stream backend"); SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle)); pHandle->list = tdListNew(sizeof(SCfComparator)); taosThreadMutexInit(&pHandle->mutex, NULL); + taosThreadMutexInit(&pHandle->cfMutex, NULL); + pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); rocksdb_env_set_low_priority_background_threads(env, 4); @@ -79,12 +96,44 @@ void* streamBackendInit(const char* path) { NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory); - char* err = NULL; - pHandle->db = rocksdb_open(opts, path, &err); - if (err != NULL) { - qError("failed to open rocksdb, path:%s, reason:%s", path, err); + char* err = NULL; + size_t nCf = 0; + + char** cfs = rocksdb_list_column_families(opts, path, &nCf, &err); + if (nCf == 0 || err != NULL) { taosMemoryFreeClear(err); - // goto _EXIT; + pHandle->db = rocksdb_open(opts, path, &err); + if (err != NULL) { + qError("failed to open rocksdb, path:%s, reason:%s", path, err); + taosMemoryFreeClear(err); + } + } else { + int64_t streamId; + int32_t taskId, dummpy = 0; + SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + for (size_t i = 0; i < nCf; i++) { + char* cf = cfs[i]; + char suffix[64] = {0}; + if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, suffix)) { + char idstr[128] = {0}; + sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId); + if (taosHashGet(tbl, idstr, strlen(idstr)) != NULL) { + taosHashPut(tbl, idstr, strlen(idstr), &dummpy, sizeof(dummpy)); + } + } else { + continue; + } + } + void* pIter = taosHashIterate(tbl, NULL); + while (pIter != NULL) { + size_t keyLen = 0; + char* key = taosHashGetKey(pIter, &keyLen); + RocksdbCfInst* inst = streamStateOpenBackendCf(pHandle, (char*)path, key); + taosHashPut(pHandle->cfInst, key, keyLen, &inst, sizeof(void*)); + + taosHashIterate(tbl, pIter); + } + taosHashCleanup(tbl); } return (void*)pHandle; @@ -93,6 +142,8 @@ _EXIT: rocksdb_cache_destroy(cache); rocksdb_env_destroy(env); taosThreadMutexDestroy(&pHandle->mutex); + taosThreadMutexDestroy(&pHandle->cfMutex); + taosHashCleanup(pHandle->cfInst); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); free(pHandle); @@ -124,6 +175,8 @@ void streamBackendCleanup(void* arg) { } // rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); + taosThreadMutexDestroy(&pHandle->cfMutex); + taosHashCleanup(pHandle->cfInst); taosMemoryFree(pHandle); @@ -493,7 +546,7 @@ typedef struct { } SCfInit; -#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUBFIX) sprintf(name, "%s_%s", idstr, (SUBFIX)); +#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; @@ -616,11 +669,92 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c return filter; } +RocksdbCfInst* streamStateOpenBackendCf(void* backend, char* name, char* idstr) { + // qInfo("start to open backend cf, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); + SBackendHandle* handle = backend; + + char* err = NULL; + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + char** cfNames = taosMemoryCalloc(cfLen, sizeof(char*)); + for (int i = 0; i < cfLen; i++) { + cfNames[i] = taosMemoryCalloc(1, 128); + GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[i].key); + } + + RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); + const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); + for (int i = 0; i < cfLen; i++) { + cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt); + // refactor later + rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); + rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); + + rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); + rocksdb_block_based_options_set_filter_policy(tableOpt, filter); + + rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); + + param[i].tableOpt = tableOpt; + }; + + rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); + for (int i = 0; i < cfLen; i++) { + SCfInit* cf = &ginitDict[i]; + + rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare); + pCompare[i] = compare; + } + rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); + rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, cfLen, (const char* const*)cfNames, + (const rocksdb_options_t* const*)cfOpt, cfHandle, &err); + if (err != NULL) { + qError("failed to open rocksdb cf, reason:%s", err); + taosMemoryFree(err); + } else { + qDebug("succ to open rocksdb cf, reason:%s", err); + } + RocksdbCfInst* inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst)); + + inst->db = db; + inst->pHandle = cfHandle; + inst->wOpt = rocksdb_writeoptions_create(); + inst->rOpt = rocksdb_readoptions_create(); + inst->cfOpt = (rocksdb_options_t**)cfOpt; + inst->dbOpt = handle->dbOpt; + inst->param = param; + inst->pBackendHandle = handle; + + handle->db = db; + + SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; + inst->pCompareNode = streamBackendAddCompare(handle, &compare); + rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + return inst; +} int streamStateOpenBackend(void* backend, SStreamState* pState) { qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); SBackendHandle* handle = backend; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); + taosThreadMutexLock(&handle->cfMutex); + RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr)); + if (ppInst != NULL && *ppInst != NULL) { + RocksdbCfInst* inst = *ppInst; + pState->pTdbState->rocksdb = inst->db; + pState->pTdbState->pHandle = inst->pHandle; + pState->pTdbState->writeOpts = inst->wOpt; + pState->pTdbState->readOpts = inst->rOpt; + pState->pTdbState->cfOpts = inst->cfOpt; + pState->pTdbState->dbOpt = handle->dbOpt; + pState->pTdbState->param = inst->param; + pState->pTdbState->pBackendHandle = handle; + pState->pTdbState->pComparNode = inst->pCompareNode; + taosThreadMutexUnlock(&handle->cfMutex); + return 0; + } + taosThreadMutexUnlock(&handle->cfMutex); + char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -650,7 +784,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { } rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); for (int i = 0; i < cfLen; i++) { - char buf[64] = {0}; + char buf[128] = {0}; GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key); cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err); if (err != NULL) { @@ -670,12 +804,21 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); + // rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); return 0; } void streamStateCloseBackend(SStreamState* pState, bool remove) { + SBackendHandle* pHandle = pState->pTdbState->pBackendHandle; + taosThreadMutexLock(&pHandle->cfMutex); + RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr)); + if (ppInst != NULL && *ppInst != NULL) { + RocksdbCfInst* inst = *ppInst; + taosMemoryFree(inst); + taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr)); + } + taosThreadMutexUnlock(&pHandle->cfMutex); char* status[] = {"close", "drop"}; qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId, pState->taskId); -- GitLab