diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 7699e6f45973c8ca834e0e93ad5f5f31797ec520..d90b5fead16ab78fef1bf828f8b8fcbc88ae9c78 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -619,7 +619,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p if (code == TSDB_CODE_SUCCESS) { tscCreateStream(pStream, pSql, code); } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(pRes->code)); + tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(code)); taosReleaseRef(tscObjRef, pSql->self); free(pStream); return NULL; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index d4d202267c37365adc2647df3874185bc669b4a2..fb683786adee33c62ba7c4af1642c9a4e83a4406 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -310,8 +310,23 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch } SCqContext *pContext = handle; int64_t rid = 0; + + pthread_mutex_lock(&pContext->mutex); + + SCqObj *pObj = pContext->pHead; + while (pObj) { + if (pObj->uid == uid) { + rid = pObj->rid; + pthread_mutex_unlock(&pContext->mutex); + return (void *)rid; + } + + pObj = pObj->next; + } - SCqObj *pObj = calloc(sizeof(SCqObj), 1); + pthread_mutex_unlock(&pContext->mutex); + + pObj = calloc(sizeof(SCqObj), 1); if (pObj == NULL) return NULL; pObj->uid = uid; @@ -386,12 +401,15 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { if (pObj == NULL) { return; } - + SCqContext* pContext = pObj->pContext; - SSqlObj* pSql = (SSqlObj*)result; - if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) { - taos_close(pSql->pTscObj); + SSqlObj* pSql = (SSqlObj*)result; + if (code == TSDB_CODE_SUCCESS) { + if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) { + taos_close(pSql->pTscObj); + } } + pthread_mutex_lock(&pContext->mutex); cqCreateStream(pContext, pObj); pthread_mutex_unlock(&pContext->mutex); @@ -427,6 +445,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl); return; } + pObj->tmrId = 0; if (pObj->pStream == NULL) {