diff --git a/cmake/cmake.define b/cmake/cmake.define index 1500858d9f541a061803c7bc76c514927813d5a2..f55a9bdabc79e31f129b2184144c9472572d5454 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_VERBOSE_MAKEFILE OFF) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 9b80ce27860a639a8a406ee886004f586940afec..301b7e7abcfb2c33ebd2d5e332753e686737b14c 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -27,6 +27,21 @@ extern "C" { #ifndef _STREAM_STATE_H_ #define _STREAM_STATE_H_ +typedef struct { + rocksdb_t* db; + rocksdb_writeoptions_t* writeOpts; + rocksdb_readoptions_t* readOpts; + rocksdb_options_t* dbOpt; + void* param; + void* env; + rocksdb_cache_t* cache; + TdThreadMutex mutex; + SList* list; +} SBackendHandle; +void* streamBackendInit(const char* path); +void streamBackendCleanup(void* arg); +SListNode* streamBackendAddCompare(void* backend, void* arg); +void streamBackendDelCompare(void* backend, void* arg); typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); typedef struct STdbState { @@ -35,11 +50,12 @@ typedef struct STdbState { rocksdb_writeoptions_t* writeOpts; rocksdb_readoptions_t* readOpts; rocksdb_options_t** cfOpts; - rocksdb_comparator_t** pCompare; rocksdb_options_t* dbOpt; struct SStreamTask* pOwner; void* param; void* env; + SListNode* pComparNode; + SBackendHandle* pBackendHandle; TDB* db; TTB* pStateDb; @@ -58,13 +74,15 @@ typedef struct { int32_t number; SSHashObj* parNameMap; int64_t checkPointId; + int32_t taskId; + int32_t streamId; } SStreamState; SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); -void streamStateClose(SStreamState* pState); +void streamStateClose(SStreamState* pState, bool remove); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); -void streamStateDestroy(SStreamState* pState); +void streamStateDestroy(SStreamState* pState, bool remove); int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark); typedef struct { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1f16659499b85177f83f56fbe041dc9311d04b0f..d821c24cdda8455ab79255b9b6d1d3c57a3cf472 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -347,6 +347,7 @@ typedef struct SStreamMeta { int32_t vgId; SRWLatch lock; int32_t walScan; + void* streamBackend; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/include/util/tlist.h b/include/util/tlist.h index e9a81d350e33cbc929a2a41daa83221eb84538ac..c684e90a3318444b706720dee0b64ba220437aee 100644 --- a/include/util/tlist.h +++ b/include/util/tlist.h @@ -228,6 +228,7 @@ void tdListPrependNode(SList *list, SListNode *node); void tdListAppendNode(SList *list, SListNode *node); int32_t tdListPrepend(SList *list, void *data); int32_t tdListAppend(SList *list, const void *data); +SListNode *tdListAdd(SList *list, const void *data); SListNode *tdListPopHead(SList *list); SListNode *tdListPopTail(SList *list); SListNode *tdListGetHead(SList *list); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ce987ca88ec930db76fcb560ff79f758e2bed1a2..f9078c257d0a75e90855e9c6f53d0501ee13868a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -90,7 +90,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { } if (isDeepFree && pItem->pStreamState) { - streamStateClose(pItem->pStreamState); + streamStateClose(pItem->pStreamState, false); } if (isDeepFree && pInfo->taskInfo[i]) { diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index bd9300dfab234525ea3589e2bce342fd07c898d6..94caf18d553727fc3d2391f0e128c1ee4e392dae 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -24,8 +24,14 @@ #include "tcompare.h" #include "ttimer.h" -int streamInitBackend(SStreamState* pState, char* path); -void streamCleanBackend(SStreamState* pState); +typedef struct SCfComparator { + rocksdb_comparator_t** comp; + int32_t numOfComp; +} SCfComparator; +int streamStateOpenBackend(void* backend, SStreamState* pState); +void streamStateCloseBackend(SStreamState* pState, bool remove); +void streamStateDestroyCompar(void* arg); +// void streamStateRemoveBackend(SStreamState* pState); int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); @@ -73,7 +79,7 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t grou int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]); int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal); -void streamStateDestroy_rocksdb(SStreamState* pState); +void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); void* streamStateCreateBatch(); int32_t streamStateGetBatchSize(void* pBatch); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 63141d621961e911e3c6519c371ba8eb6297fe0f..02c325d4baad017960cb7f5f96380a22bac914ad 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -65,6 +65,19 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; + + char* statePath = taosMemoryCalloc(1, len); + sprintf(statePath, "%s/%s", pMeta->path, "state"); + code = taosMulModeMkDir(statePath, 0755); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + taosMemoryFree(streamPath); + goto _err; + } + + pMeta->streamBackend = streamBackendInit(statePath); + taosMemoryFree(statePath); + taosInitRWLatch(&pMeta->lock); return pMeta; @@ -74,6 +87,7 @@ _err: if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); + if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend); taosMemoryFree(pMeta); return NULL; } @@ -101,6 +115,7 @@ void streamMetaClose(SStreamMeta* pMeta) { } taosHashCleanup(pMeta->pTasks); + streamBackendCleanup(pMeta->streamBackend); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); } @@ -184,9 +199,7 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* return 0; } -int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { - return (int32_t) taosHashGetSize(pMeta->pTasks); -} +int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { return (int32_t)taosHashGetSize(pMeta->pTasks); } SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { taosRLockLatch(&pMeta->lock); @@ -220,7 +233,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); + // + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); taosWLockLatch(&pMeta->lock); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 791d3013bfa378600711b08bb6b4f237d5fa4a7e..3467fa5ffe2e9c6584221849580fca459c8ad1d1 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -26,6 +26,88 @@ #define MAX_TABLE_NAME_NUM 100000 +void* streamBackendInit(const char* path) { + SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle)); + pHandle->list = tdListNew(sizeof(SCfComparator)); + taosThreadMutexInit(&pHandle->mutex, NULL); + + rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); + rocksdb_env_set_low_priority_background_threads(env, 4); + rocksdb_env_set_high_priority_background_threads(env, 2); + + rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); + + rocksdb_options_t* opts = rocksdb_options_create(); + rocksdb_options_set_env(opts, env); + rocksdb_options_set_create_if_missing(opts, 1); + rocksdb_options_set_create_missing_column_families(opts, 1); + rocksdb_options_set_write_buffer_size(opts, 128 << 20); + rocksdb_options_set_max_total_wal_size(opts, 128 << 20); + rocksdb_options_set_recycle_log_file_num(opts, 6); + rocksdb_options_set_max_write_buffer_number(opts, 3); + + pHandle->env = env; + pHandle->dbOpt = opts; + pHandle->cache = cache; + + char* err = NULL; + pHandle->db = rocksdb_open(opts, path, &err); + if (err != NULL) { + qError("failed to open rocksdb, path:%s, reason:%s", path, err); + taosMemoryFreeClear(err); + goto _EXIT; + } + + return pHandle; +_EXIT: + rocksdb_options_destroy(opts); + rocksdb_cache_destroy(cache); + rocksdb_env_destroy(env); + taosThreadMutexDestroy(&pHandle->mutex); + tdListFree(pHandle->list); + free(pHandle); + return NULL; +} +void streamBackendCleanup(void* arg) { + SBackendHandle* pHandle = (SBackendHandle*)arg; + rocksdb_close(pHandle->db); + rocksdb_options_destroy(pHandle->dbOpt); + rocksdb_env_destroy(pHandle->env); + rocksdb_cache_destroy(pHandle->cache); + + taosThreadMutexDestroy(&pHandle->mutex); + SListNode* head = tdListPopHead(pHandle->list); + while (head != NULL) { + streamStateDestroyCompar(head->data); + taosMemoryFree(head); + head = tdListPopHead(pHandle->list); + } + tdListFree(pHandle->list); + + taosMemoryFree(pHandle); + + return; +} +SListNode* streamBackendAddCompare(void* backend, void* arg) { + SBackendHandle* pHandle = (SBackendHandle*)backend; + SListNode* node = NULL; + taosThreadMutexLock(&pHandle->mutex); + node = tdListAdd(pHandle->list, arg); + taosThreadMutexUnlock(&pHandle->mutex); + return node; +} +void streamBackendDelCompare(void* backend, void* arg) { + SBackendHandle* pHandle = (SBackendHandle*)backend; + SListNode* node = NULL; + taosThreadMutexLock(&pHandle->mutex); + node = tdListPopNode(pHandle->list, arg); + taosThreadMutexUnlock(&pHandle->mutex); + if (node) { + streamStateDestroyCompar(node->data); + taosMemoryFree(node); + } +} + int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { return 1; @@ -100,7 +182,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState)); if (pState->pTdbState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - streamStateDestroy(pState); + streamStateDestroy(pState, true); return NULL; } @@ -111,9 +193,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int memset(statePath, 0, 1024); tstrncpy(statePath, path, 1024); } + pState->taskId = pTask->id.taskId; + pState->streamId = pTask->id.streamId; #ifdef USE_ROCKSDB qWarn("open stream state1"); - int code = streamInitBackend(pState, statePath); + int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState); if (code == -1) { taosMemoryFree(pState); pState = NULL; @@ -205,14 +289,15 @@ _err: tdbTbClose(pState->pTdbState->pParNameDb); tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); - streamStateDestroy(pState); + streamStateDestroy(pState, false); return NULL; #endif } -void streamStateClose(SStreamState* pState) { +void streamStateClose(SStreamState* pState, bool remove) { #ifdef USE_ROCKSDB - streamCleanBackend(pState); + // streamStateCloseBackend(pState); + streamStateDestroy(pState, remove); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); @@ -224,7 +309,6 @@ void streamStateClose(SStreamState* pState) { tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); #endif - streamStateDestroy(pState); } int32_t streamStateBegin(SStreamState* pState) { @@ -388,6 +472,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo #ifdef USE_ROCKSDB int32_t code = 0; void* batch = streamStateCreateBatch(); + code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen); if (code != 0) { return code; @@ -1077,10 +1162,10 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal #endif } -void streamStateDestroy(SStreamState* pState) { +void streamStateDestroy(SStreamState* pState, bool remove) { #ifdef USE_ROCKSDB streamFileStateDestroy(pState->pFileState); - streamStateDestroy_rocksdb(pState); + streamStateDestroy_rocksdb(pState, remove); tSimpleHashCleanup(pState->parNameMap); // do nothong #endif diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 9c0f81894d9f3d720c4fe9cdb0920c76c5e309f6..4f336e8e711821fa43606815bcddd446217ac9ca 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -324,7 +324,6 @@ int32_t streaValueIsStale(void* k, int64_t ts) { typedef struct { void* tableOpt; - void* lru; // global or not } rocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; @@ -358,6 +357,9 @@ typedef struct { } SCfInit; +#define GEN_COLUMN_FAMILY_NAME(name, streamId, taskId, SUBFIX) \ + sprintf(name, "%d_%d_%s", (streamId), (taskId), (SUBFIX)); + SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, destroyFunc}, @@ -378,21 +380,9 @@ const char* compareFuncKeyName(void* name) { return ginitDict[4].key; } const char* compareParKeyName(void* name) { return ginitDict[5].key; } const char* comparePartagKeyName(void* name) { return ginitDict[6].key; } -int streamInitBackend(SStreamState* pState, char* path) { - rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); - rocksdb_env_set_low_priority_background_threads(env, 4); - rocksdb_env_set_high_priority_background_threads(env, 2); - - rocksdb_options_t* opts = rocksdb_options_create(); - rocksdb_options_set_env(opts, env); - // rocksdb_options_increase_parallelism(opts, 8); - // rocksdb_options_optimize_level_style_compaction(opts, 0); - // create the DB if it's not already present - rocksdb_options_set_create_if_missing(opts, 1); - rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_options_set_write_buffer_size(opts, 64 << 20); - rocksdb_options_set_recycle_log_file_num(opts, 6); - rocksdb_options_set_max_write_buffer_number(opts, 3); +int streamStateOpenBackend(void* backend, SStreamState* pState) { + qError("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId); + SBackendHandle* handle = backend; char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -400,11 +390,10 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(rocksdbCfParam)); const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); for (int i = 0; i < cfLen; i++) { - cfOpt[i] = rocksdb_options_create_copy(opts); + cfOpt[i] = rocksdb_options_create(); // refactor later rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); - rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20); - rocksdb_block_based_options_set_block_cache(tableOpt, cache); + rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); rocksdb_block_based_options_set_filter_policy(tableOpt, filter); @@ -412,73 +401,93 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); param[i].tableOpt = tableOpt; - param[i].lru = cache; - // rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8); - // rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans); }; rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); for (int i = 0; i < cfLen; i++) { - SCfInit* cf = &ginitDict[i]; + SCfInit* cf = &ginitDict[i]; + rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare); pCompare[i] = compare; } - rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); - rocksdb_t* db = rocksdb_open_column_families(opts, path, cfLen, cfName, cfOpt, cfHandle, &err); + for (int i = 0; i < cfLen; i++) { + char buf[64] = {0}; + GEN_COLUMN_FAMILY_NAME(buf, pState->streamId, pState->taskId, ginitDict[i].key); + cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err); + if (err != NULL) { + qError("rocksdb create column family failed, reason:%s", err); + taosMemoryFree(err); + return -1; + } + } - pState->pTdbState->rocksdb = db; + pState->pTdbState->rocksdb = handle->db; pState->pTdbState->pHandle = cfHandle; pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); - // rocksdb_writeoptions_ - // rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->writeOpts, 1); pState->pTdbState->readOpts = rocksdb_readoptions_create(); pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt; - pState->pTdbState->pCompare = pCompare; - pState->pTdbState->dbOpt = opts; + // pState->pTdbState->pCompare = pCompare; + pState->pTdbState->dbOpt = handle->dbOpt; pState->pTdbState->param = param; - pState->pTdbState->env = env; + + SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; + pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); + + rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); + qError("end to open backend, %p", pState); return 0; } -void streamCleanBackend(SStreamState* pState) { +void streamStateCloseBackend(SStreamState* pState, bool remove) { + char* status[] = {"remove", "drop"}; + qError("start to %s backend, %p, %d-%d", status[remove == false ? 1 : 0], pState, pState->streamId, pState->taskId); if (pState->pTdbState->rocksdb == NULL) { - qInfo("rocksdb already free"); return; } int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); rocksdbCfParam* param = pState->pTdbState->param; + + char* err = NULL; + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + for (int i = 0; i < cfLen; i++) { + if (remove) { + rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err); + } else { + rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err); + } + } + rocksdb_flushoptions_destroy(flushOpt); + for (int i = 0; i < cfLen; i++) { rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); } taosMemoryFreeClear(pState->pTdbState->pHandle); - rocksdb_options_destroy(pState->pTdbState->dbOpt); - + for (int i = 0; i < cfLen; i++) { + rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); + rocksdb_block_based_options_destroy(param[i].tableOpt); + } + if (remove) { + streamBackendDelCompare(pState->pTdbState->pBackendHandle, pState->pTdbState->pComparNode); + } rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts); pState->pTdbState->writeOpts = NULL; rocksdb_readoptions_destroy(pState->pTdbState->readOpts); pState->pTdbState->readOpts = NULL; - - rocksdb_close(pState->pTdbState->rocksdb); - // wait for all background work to finish - for (int i = 0; i < cfLen; i++) { - rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); - rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); - - rocksdb_cache_destroy(param[i].lru); - rocksdb_block_based_options_destroy(param[i].tableOpt); - } taosMemoryFreeClear(pState->pTdbState->cfOpts); - taosMemoryFree(pState->pTdbState->pCompare); - taosMemoryFree(pState->pTdbState->param); - rocksdb_env_destroy(pState->pTdbState->env); - + taosMemoryFreeClear(pState->pTdbState->param); pState->pTdbState->rocksdb = NULL; } - +void streamStateDestroyCompar(void* arg) { + SCfComparator* comp = (SCfComparator*)arg; + for (int i = 0; i < comp->numOfComp; i++) { + rocksdb_comparator_destroy(comp->comp[i]); + } + taosMemoryFree(comp->comp); +} int streamGetInit(const char* funcName) { size_t len = strlen(funcName); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { @@ -1540,7 +1549,7 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi return code; } -void streamStateDestroy_rocksdb(SStreamState* pState) { +void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { // only close db - streamCleanBackend(pState); + streamStateCloseBackend(pState, remove); } \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 67c60008fdb98b5f341be5271668a17bd3563c92..97500fe4c86de12e39d5425a370bc118eabbd408 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -27,7 +27,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) { pTask->id.streamId = streamId; char buf[128] = {0}; - sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId); + sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId); pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; @@ -171,7 +171,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) { qDebug("free s-task:%s", pTask->id.idStr); - + int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { streamQueueClose(pTask->inputQueue); } @@ -204,10 +204,10 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->pState) { - streamStateClose(pTask->pState); + streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); } - if (pTask->id.idStr != NULL) { + if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); } diff --git a/source/util/src/tlist.c b/source/util/src/tlist.c index a89105bb42fea4941a6a08124fb0b9fb6a8fe6d7..db5a214ada1da9f323e7090a7b517a8e9725ae05 100644 --- a/source/util/src/tlist.c +++ b/source/util/src/tlist.c @@ -87,6 +87,15 @@ int32_t tdListAppend(SList *list, const void *data) { return 0; } +// return the node pointer +SListNode *tdListAdd(SList *list, const void *data) { + SListNode *node = (SListNode *)taosMemoryCalloc(1, sizeof(SListNode) + list->eleSize); + if (node == NULL) return NULL; + + memcpy((void *)(node->data), data, list->eleSize); + TD_DLIST_APPEND(list, node); + return node; +} SListNode *tdListPopHead(SList *list) { SListNode *node;