提交 9c9fd839 编写于 作者: D dapan1121

fix bug

上级 9777218e
......@@ -91,6 +91,20 @@ void cqRmFromList(SCqObj *pObj) {
}
static void freeSCqContext(void *handle) {
if (handle == NULL) {
return;
}
SCqContext *pContext = handle;
pthread_mutex_destroy(&pContext->mutex);
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
cDebug("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
}
void cqFree(void *handle) {
if (tsEnableStream == 0) {
return;
......@@ -125,13 +139,7 @@ void cqFree(void *handle) {
pthread_mutex_unlock(&pContext->mutex);
if (delete) {
pthread_mutex_destroy(&pContext->mutex);
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
cDebug("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
freeSCqContext(pContext);
}
}
......@@ -184,18 +192,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
return pContext;
}
static void freeSCqContext(void *handle) {
if (handle == NULL) {
return;
}
SCqContext *pContext = handle;
pthread_mutex_destroy(&pContext->mutex);
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
cDebug("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
}
void cqClose(void *handle) {
if (tsEnableStream == 0) {
return;
......@@ -204,6 +201,8 @@ void cqClose(void *handle) {
if (handle == NULL) return;
pContext->delete = 1;
int32_t hasCq = 0;
int32_t existLoop = 0;
// stop all CQs
cqStop(pContext);
......@@ -218,6 +217,12 @@ void cqClose(void *handle) {
cqRmFromList(pObj);
rid = pObj->rid;
hasCq = 1;
if (pContext->pHead == NULL) {
existLoop = 1;
}
} else {
pthread_mutex_unlock(&pContext->mutex);
break;
......@@ -226,9 +231,15 @@ void cqClose(void *handle) {
pthread_mutex_unlock(&pContext->mutex);
taosRemoveRef(cqObjRef, rid);
if (existLoop) {
break;
}
}
freeSCqContext(pContext);
if (hasCq == 0) {
freeSCqContext(pContext);
}
}
void cqStart(void *handle) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册