diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 1be7552a892e25cf44317a99fe52ff57689ad338..84b2c297d0e0cd3e35c2b62564300e0a9bd9c5b6 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -241,8 +241,12 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { SCqObj* pObj = (SCqObj*)param; SCqContext* pContext = pObj->pContext; SSqlObj* pSql = (SSqlObj*)result; - pContext->dbConn = pSql->pTscObj; + if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) { + taos_close(pSql->pTscObj); + } + pthread_mutex_lock(&pContext->mutex); cqCreateStream(pContext, pObj); + pthread_mutex_unlock(&pContext->mutex); } static void cqProcessCreateTimer(void *param, void *tmrId) { @@ -253,7 +257,9 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { cDebug("vgId:%d, try connect to TDengine", pContext->vgId); taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL); } else { + pthread_mutex_lock(&pContext->mutex); cqCreateStream(pContext, pObj); + pthread_mutex_unlock(&pContext->mutex); } } @@ -267,12 +273,14 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { } pObj->tmrId = 0; - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); - if (pObj->pStream) { - pContext->num++; - cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); - } else { - cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); + if (pObj->pStream == NULL) { + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); + } } }