From 643a9dfa186cad243b38e809014902f2a7296fc3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 16 May 2023 10:46:11 +0000 Subject: [PATCH] fix crash when taosd quit --- source/libs/stream/src/streamBackendRocksdb.c | 12 +++++++++--- source/libs/stream/src/streamMeta.c | 5 ++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 16ba81c74a..d566950410 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -109,7 +109,10 @@ void* streamBackendInit(const char* path) { if (err != NULL) { qError("failed to open rocksdb, path:%s, reason:%s", path, err); taosMemoryFreeClear(err); + rocksdb_list_column_families_destroy(cfs, nCf); + goto _EXIT; } + } else { /* list all cf and get prefix @@ -151,6 +154,11 @@ _EXIT: } void streamBackendCleanup(void* arg) { SBackendHandle* pHandle = (SBackendHandle*)arg; + if (pHandle == NULL || pHandle->db == NULL) { + taosMemoryFree(pHandle); + return; + } + RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL); while (pIter != NULL) { RocksdbCfInst* inst = *pIter; @@ -158,7 +166,6 @@ void streamBackendCleanup(void* arg) { taosHashIterate(pHandle->cfInst, pIter); } taosHashCleanup(pHandle->cfInst); - rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); char* err = NULL; rocksdb_flush(pHandle->db, flushOpt, &err); @@ -167,8 +174,8 @@ void streamBackendCleanup(void* arg) { taosMemoryFree(err); } rocksdb_flushoptions_destroy(flushOpt); - rocksdb_close(pHandle->db); + rocksdb_options_destroy(pHandle->dbOpt); rocksdb_env_destroy(pHandle->env); rocksdb_cache_destroy(pHandle->cache); @@ -180,7 +187,6 @@ void streamBackendCleanup(void* arg) { taosMemoryFree(head); head = tdListPopHead(pHandle->list); } - // rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); taosThreadMutexDestroy(&pHandle->cfMutex); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 682ce08c7f..270a2079e7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -90,6 +90,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } pMeta->streamBackend = streamBackendInit(streamPath); + if (pMeta->streamBackend == NULL) { + terrno = -1; + goto _err; + } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); taosMemoryFree(streamPath); @@ -105,7 +109,6 @@ _err: if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); - // if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend); taosMemoryFree(pMeta); qError("failed to open stream meta"); return NULL; -- GitLab