diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index a5de27d7fc0cc3922456f5311834f2bd812f7a57..cc1628c968afd339440186e689850a46dba4af2b 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -125,8 +125,6 @@ void cqFree(void *handle) { pthread_mutex_unlock(&pContext->mutex); if (delete) { - pthread_mutex_unlock(&pContext->mutex); - pthread_mutex_destroy(&pContext->mutex); taosTmrCleanUp(pContext->tmrCtrl); @@ -186,6 +184,18 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { return pContext; } +static void freeSCqContext(void *handle) { + if (handle == NULL) { + return; + } + SCqContext *pContext = handle; + pthread_mutex_destroy(&pContext->mutex); + + taosTmrCleanUp(pContext->tmrCtrl); + pContext->tmrCtrl = NULL; + cDebug("vgId:%d, CQ is closed", pContext->vgId); + free(pContext); +} void cqClose(void *handle) { if (tsEnableStream == 0) { return; @@ -217,6 +227,7 @@ void cqClose(void *handle) { taosRemoveRef(cqObjRef, rid); } + freeSCqContext(pContext); } void cqStart(void *handle) {