提交 73d60547 编写于 作者: D dapan1121

dulicated CQ created when wal is re-executed issue

上级 a20fe8d2
......@@ -619,7 +619,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
if (code == TSDB_CODE_SUCCESS) {
tscCreateStream(pStream, pSql, code);
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(pRes->code));
tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self);
free(pStream);
return NULL;
......
......@@ -310,8 +310,23 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
}
SCqContext *pContext = handle;
int64_t rid = 0;
pthread_mutex_lock(&pContext->mutex);
SCqObj *pObj = pContext->pHead;
while (pObj) {
if (pObj->uid == uid) {
rid = pObj->rid;
pthread_mutex_unlock(&pContext->mutex);
return (void *)rid;
}
pObj = pObj->next;
}
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
pthread_mutex_unlock(&pContext->mutex);
pObj = calloc(sizeof(SCqObj), 1);
if (pObj == NULL) return NULL;
pObj->uid = uid;
......@@ -386,12 +401,15 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
if (pObj == NULL) {
return;
}
SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result;
if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
taos_close(pSql->pTscObj);
SSqlObj* pSql = (SSqlObj*)result;
if (code == TSDB_CODE_SUCCESS) {
if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
taos_close(pSql->pTscObj);
}
}
pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
......@@ -427,6 +445,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl);
return;
}
pObj->tmrId = 0;
if (pObj->pStream == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册