diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cdcd4824425693f31858dea1714f90e1ab531909..37b4f45df68ebdab81db6b8193632d4d7179453e 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -871,14 +871,14 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { int32_t code = -1; SArray* refs = taosArrayInit(16, sizeof(int64_t)); - SArray* pCf = taosArrayInit(16, POINTER_BYTES); rocksdb_column_family_handle_t** ppCf = NULL; char* pChkpDir = NULL; char* pChkpIdDir = NULL; if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) { - goto _ERROR; + taosArrayDestroy(refs); + return code; } SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); @@ -886,6 +886,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { goto _ERROR; } + // Get all cf and acquire cfWrappter int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); @@ -901,7 +902,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } else { qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); } - + // release all ref to cfWrapper; for (int i = 0; i < taosArrayGetSize(refs); i++) { int64_t id = *(int64_t*)taosArrayGet(refs, i); taosReleaseRef(streamBackendCfWrapperId, id);