diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index eac7e10a4ec7614fbdcaffb340592a4d8269b38f..5d2970a4b70cb2ec68cb6c069d393513757871d4 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -44,6 +44,7 @@ typedef struct { SList* list; TdThreadMutex cfMutex; SHashObj* cfInst; + int64_t defaultCfInit; } SBackendHandle; void* streamBackendInit(const char* path); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 9599c0e2140229602f22efc692a0b6b30eaf4317..793c6a4e1f036c21964d85f6b46e56ad69b6991a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -33,7 +33,9 @@ typedef struct { SListNode* pCompareNode; } RocksdbCfInst; -RocksdbCfInst* streamStateOpenBackendCf(void* backend, char* name, char* idstr); +int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids); + +void destroyRocksdbCfInst(RocksdbCfInst* inst); void destroyCompactFilteFactory(void* arg); void destroyCompactFilte(void* arg); @@ -100,7 +102,7 @@ void* streamBackendInit(const char* path) { size_t nCf = 0; char** cfs = rocksdb_list_column_families(opts, path, &nCf, &err); - if (nCf == 0 || err != NULL) { + if (nCf == 0 || nCf == 1 || err != NULL) { taosMemoryFreeClear(err); pHandle->db = rocksdb_open(opts, path, &err); if (err != NULL) { @@ -117,24 +119,18 @@ void* streamBackendInit(const char* path) { if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, suffix)) { char idstr[128] = {0}; sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId); - if (taosHashGet(tbl, idstr, strlen(idstr)) != NULL) { - taosHashPut(tbl, idstr, strlen(idstr), &dummpy, sizeof(dummpy)); + // qError("make cf name %s", idstr); + if (taosHashGet(tbl, idstr, strlen(idstr) + 1) == NULL) { + taosHashPut(tbl, idstr, strlen(idstr) + 1, &dummpy, sizeof(dummpy)); } } else { continue; } } - void* pIter = taosHashIterate(tbl, NULL); - while (pIter != NULL) { - size_t keyLen = 0; - char* key = taosHashGetKey(pIter, &keyLen); - RocksdbCfInst* inst = streamStateOpenBackendCf(pHandle, (char*)path, key); - taosHashPut(pHandle->cfInst, key, keyLen, &inst, sizeof(void*)); - - taosHashIterate(tbl, pIter); - } + streamStateOpenBackendCf(pHandle, (char*)path, tbl); taosHashCleanup(tbl); } + rocksdb_list_column_families_destroy(cfs, nCf); return (void*)pHandle; _EXIT: @@ -151,6 +147,13 @@ _EXIT: } void streamBackendCleanup(void* arg) { SBackendHandle* pHandle = (SBackendHandle*)arg; + RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL); + while (pIter != NULL) { + RocksdbCfInst* inst = *pIter; + destroyRocksdbCfInst(inst); + taosHashIterate(pHandle->cfInst, pIter); + } + taosHashCleanup(pHandle->cfInst); rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); char* err = NULL; @@ -176,7 +179,6 @@ void streamBackendCleanup(void* arg) { // rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); taosThreadMutexDestroy(&pHandle->cfMutex); - taosHashCleanup(pHandle->cfInst); taosMemoryFree(pHandle); @@ -669,22 +671,56 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c return filter; } -RocksdbCfInst* streamStateOpenBackendCf(void* backend, char* name, char* idstr) { - // qInfo("start to open backend cf, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); - SBackendHandle* handle = backend; - - char* err = NULL; - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); - char** cfNames = taosMemoryCalloc(cfLen, sizeof(char*)); +void destroyRocksdbCfInst(RocksdbCfInst* inst) { + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); for (int i = 0; i < cfLen; i++) { - cfNames[i] = taosMemoryCalloc(1, 128); - GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[i].key); + rocksdb_column_family_handle_destroy(inst->pHandle[i]); } - RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); - const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); - for (int i = 0; i < cfLen; i++) { - cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt); + rocksdb_writeoptions_destroy(inst->wOpt); + inst->wOpt = NULL; + + rocksdb_readoptions_destroy(inst->rOpt); + taosMemoryFree(inst->cfOpt); + taosMemoryFree(inst->param); + taosMemoryFreeClear(inst->param); + taosMemoryFree(inst); +} + +int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { + SBackendHandle* handle = backend; + char* err = NULL; + size_t nSize = taosHashGetSize(ids); + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + + char** cfNames = taosMemoryCalloc(nSize * cfLen + 1, sizeof(char*)); + void* pIter = taosHashIterate(ids, NULL); + size_t keyLen = 0; + char* idstr = taosHashGetKey(pIter, &keyLen); + for (int i = 0; i < nSize * cfLen + 1; i++) { + cfNames[i] = (char*)taosMemoryCalloc(1, 128); + if (i == 0) { + memcpy(cfNames[0], "default", strlen("default")); + continue; + } + qError("cf name %s", idstr); + + GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key); + if (i % cfLen == 0) { + pIter = taosHashIterate(ids, pIter); + if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); + } + } + for (int i = 0; i < nSize * cfLen + 1; i++) { + qError("cf name %s", cfNames[i]); + } + rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*)); + RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*)); + for (int i = 0; i < nSize * cfLen + 1; i++) { + cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt); + if (i == 0) { + continue; + } // refactor later rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); @@ -692,45 +728,77 @@ RocksdbCfInst* streamStateOpenBackendCf(void* backend, char* name, char* idstr) rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); rocksdb_block_based_options_set_filter_policy(tableOpt, filter); - rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); - - param[i].tableOpt = tableOpt; + rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt); + params[i].tableOpt = tableOpt; }; - rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); - for (int i = 0; i < cfLen; i++) { - SCfInit* cf = &ginitDict[i]; + rocksdb_comparator_t** pCompare = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_comparator_t**)); + for (int i = 0; i < nSize * cfLen + 1; i++) { + if (i == 0) { + continue; + } + SCfInit* cf = &ginitDict[(i - 1) % cfLen]; rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare); pCompare[i] = compare; } - rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); - rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, cfLen, (const char* const*)cfNames, - (const rocksdb_options_t* const*)cfOpt, cfHandle, &err); + rocksdb_column_family_handle_t** cfHandle = + taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_column_family_handle_t*)); + rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nSize * cfLen + 1, (const char* const*)cfNames, + (const rocksdb_options_t* const*)cfOpts, cfHandle, &err); if (err != NULL) { qError("failed to open rocksdb cf, reason:%s", err); taosMemoryFree(err); } else { qDebug("succ to open rocksdb cf, reason:%s", err); } - RocksdbCfInst* inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst)); - - inst->db = db; - inst->pHandle = cfHandle; - inst->wOpt = rocksdb_writeoptions_create(); - inst->rOpt = rocksdb_readoptions_create(); - inst->cfOpt = (rocksdb_options_t**)cfOpt; - inst->dbOpt = handle->dbOpt; - inst->param = param; - inst->pBackendHandle = handle; - handle->db = db; + pIter = taosHashIterate(ids, NULL); + idstr = taosHashGetKey(pIter, &keyLen); + for (int i = 0; i < nSize; i++) { + RocksdbCfInst* inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst)); + rocksdb_column_family_handle_t** subCf = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); + rocksdb_comparator_t** subCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); + RocksdbCfParam* subParam = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); + rocksdb_options_t** subOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); + for (int j = 0; j < cfLen; j++) { + subCf[j] = cfHandle[i * cfLen + j + 1]; + subCompare[j] = pCompare[i * cfLen + j + 1]; + subParam[j] = params[i * cfLen + j + 1]; + subOpt[j] = cfOpts[i * cfLen + j + 1]; + } + inst->db = db; + inst->pHandle = subCf; + inst->wOpt = rocksdb_writeoptions_create(); + inst->rOpt = rocksdb_readoptions_create(); + inst->cfOpt = (rocksdb_options_t**)subOpt; + inst->dbOpt = handle->dbOpt; + inst->param = subParam; + inst->pBackendHandle = handle; + handle->db = db; + SCfComparator compare = {.comp = subCompare, .numOfComp = cfLen}; + inst->pCompareNode = streamBackendAddCompare(handle, &compare); + rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + + taosHashPut(handle->cfInst, idstr, keyLen, &inst, sizeof(void*)); + + pIter = taosHashIterate(ids, pIter); + if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); + } + rocksdb_column_family_handle_destroy(cfHandle[0]); + rocksdb_options_destroy(cfOpts[0]); + + for (int i = 0; i < nSize * cfLen + 1; i++) { + taosMemoryFree(cfNames[i]); + } + taosMemoryFree(cfNames); + taosMemoryFree(cfHandle); + taosMemoryFree(pCompare); + taosMemoryFree(params); + taosMemoryFree(cfOpts); - SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; - inst->pCompareNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); - return inst; + return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); @@ -738,7 +806,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); taosThreadMutexLock(&handle->cfMutex); - RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr)); + RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { RocksdbCfInst* inst = *ppInst; pState->pTdbState->rocksdb = inst->db; @@ -812,13 +880,14 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { void streamStateCloseBackend(SStreamState* pState, bool remove) { SBackendHandle* pHandle = pState->pTdbState->pBackendHandle; taosThreadMutexLock(&pHandle->cfMutex); - RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr)); + RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { RocksdbCfInst* inst = *ppInst; taosMemoryFree(inst); - taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr)); + taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); } taosThreadMutexUnlock(&pHandle->cfMutex); + char* status[] = {"close", "drop"}; qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId, pState->taskId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0ec8f0458a33d68e30b7ab09cef068a0fff69c62..2b5be9b3e430e781ffdefa6f6e43a799b4b6f0a2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -94,6 +94,7 @@ _err: if (pMeta->db) tdbClose(pMeta->db); // if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend); taosMemoryFree(pMeta); + qError("failed to open stream meta"); return NULL; }