未验证 提交 c9a725df 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4370 from taosdata/bugfix/td-2198

[TD-2198]<fix>: cq can be created more than once
...@@ -241,8 +241,12 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { ...@@ -241,8 +241,12 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
SCqObj* pObj = (SCqObj*)param; SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext; SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result; SSqlObj* pSql = (SSqlObj*)result;
pContext->dbConn = pSql->pTscObj; if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
taos_close(pSql->pTscObj);
}
pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj); cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
} }
static void cqProcessCreateTimer(void *param, void *tmrId) { static void cqProcessCreateTimer(void *param, void *tmrId) {
...@@ -253,7 +257,9 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { ...@@ -253,7 +257,9 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
cDebug("vgId:%d, try connect to TDengine", pContext->vgId); cDebug("vgId:%d, try connect to TDengine", pContext->vgId);
taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL); taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
} else { } else {
pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj); cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
} }
} }
...@@ -267,6 +273,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -267,6 +273,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
} }
pObj->tmrId = 0; pObj->tmrId = 0;
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
if (pObj->pStream) { if (pObj->pStream) {
pContext->num++; pContext->num++;
...@@ -274,6 +281,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -274,6 +281,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
} else { } else {
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
} }
}
} }
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册