未验证 提交 b0e0f973 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #5816 from taosdata/hotfix/TD-3774

[TD-3774]dulicated CQ created when wal is re-executed issue
...@@ -619,7 +619,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -619,7 +619,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tscCreateStream(pStream, pSql, code); tscCreateStream(pStream, pSql, code);
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(pRes->code)); tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
free(pStream); free(pStream);
return NULL; return NULL;
......
...@@ -311,7 +311,22 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch ...@@ -311,7 +311,22 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
SCqContext *pContext = handle; SCqContext *pContext = handle;
int64_t rid = 0; int64_t rid = 0;
SCqObj *pObj = calloc(sizeof(SCqObj), 1); pthread_mutex_lock(&pContext->mutex);
SCqObj *pObj = pContext->pHead;
while (pObj) {
if (pObj->uid == uid) {
rid = pObj->rid;
pthread_mutex_unlock(&pContext->mutex);
return (void *)rid;
}
pObj = pObj->next;
}
pthread_mutex_unlock(&pContext->mutex);
pObj = calloc(sizeof(SCqObj), 1);
if (pObj == NULL) return NULL; if (pObj == NULL) return NULL;
pObj->uid = uid; pObj->uid = uid;
...@@ -389,9 +404,12 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { ...@@ -389,9 +404,12 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
SCqContext* pContext = pObj->pContext; SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result; SSqlObj* pSql = (SSqlObj*)result;
if (code == TSDB_CODE_SUCCESS) {
if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) { if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
taos_close(pSql->pTscObj); taos_close(pSql->pTscObj);
} }
}
pthread_mutex_lock(&pContext->mutex); pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj); cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
...@@ -427,6 +445,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -427,6 +445,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl); pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl);
return; return;
} }
pObj->tmrId = 0; pObj->tmrId = 0;
if (pObj->pStream == NULL) { if (pObj->pStream == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册