diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index f620eb4dd55754146ddd164d372bb1233cf94aba..d4d202267c37365adc2647df3874185bc669b4a2 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -73,6 +73,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); int32_t cqObjRef = -1; +int32_t cqVnodeNum = 0; void cqRmFromList(SCqObj *pObj) { //LOCK in caller @@ -166,6 +167,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { return NULL; } + atomic_add_fetch_32(&cqVnodeNum, 1); + cqCreateRef(); pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ"); @@ -240,6 +243,13 @@ void cqClose(void *handle) { if (hasCq == 0) { freeSCqContext(pContext); } + + int32_t remainn = atomic_sub_fetch_32(&cqVnodeNum, 1); + if (remainn <= 0) { + int32_t ref = cqObjRef; + cqObjRef = -1; + taosCloseRef(ref); + } } void cqStart(void *handle) {