From 42e44b4b0068fbcb8b290c5f927ec08c4b546aaa Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 22 May 2020 00:51:36 +0000 Subject: [PATCH] set up DB connection only when there is a continuous query --- src/cq/src/cqMain.c | 60 +++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index e4f3142b89..7935bb7ff5 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -40,6 +40,7 @@ typedef struct { int num; // number of continuous streams struct SCqObj *pHead; void *dbConn; + int master; pthread_mutex_t mutex; } SCqContext; @@ -58,6 +59,7 @@ typedef struct SCqObj { int cqDebugFlag = 135; static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); +static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); void *cqOpen(void *ahandle, const SCqCfg *pCfg) { @@ -69,6 +71,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { pContext->vgId = pCfg->vgId; pContext->cqWrite = pCfg->cqWrite; pContext->ahandle = ahandle; + tscEmbedded = 1; pthread_mutex_init(&pContext->mutex, NULL); @@ -84,6 +87,8 @@ void cqClose(void *handle) { cqStop(pContext); // free all resources + pthread_mutex_lock(&pContext->mutex); + SCqObj *pObj = pContext->pHead; while (pObj) { SCqObj *pTemp = pObj; @@ -91,6 +96,8 @@ void cqClose(void *handle) { free(pTemp); } + pthread_mutex_unlock(&pContext->mutex); + pthread_mutex_destroy(&pContext->mutex); cTrace("vgId:%d, CQ is closed", pContext->vgId); @@ -100,28 +107,15 @@ void cqClose(void *handle) { void cqStart(void *handle) { SCqContext *pContext = handle; cTrace("vgId:%d, start all CQs", pContext->vgId); - if (pContext->dbConn) return; + if (pContext->dbConn || pContext->master) return; pthread_mutex_lock(&pContext->mutex); - tscEmbedded = 1; - pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); - if (pContext->dbConn == NULL) { - cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); - pthread_mutex_unlock(&pContext->mutex); - return; - } + pContext->master = 1; SCqObj *pObj = pContext->pHead; while (pObj) { - int64_t lastKey = 0; - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); - if (pObj->pStream) { - pContext->num++; - cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); - } else { - cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); - } + cqCreateStream(pContext, pObj); pObj = pObj->next; } @@ -131,10 +125,11 @@ void cqStart(void *handle) { void cqStop(void *handle) { SCqContext *pContext = handle; cTrace("vgId:%d, stop all CQs", pContext->vgId); - if (pContext->dbConn == NULL) return; + if (pContext->dbConn == NULL || pContext->master == 0) return; pthread_mutex_lock(&pContext->mutex); + pContext->master = 0; SCqObj *pObj = pContext->pHead; while (pObj) { if (pObj->pStream) { @@ -176,16 +171,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column if (pContext->pHead) pContext->pHead->prev = pObj; pContext->pHead = pObj; - if (pContext->dbConn) { - int64_t lastKey = 0; - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); - if (pObj->pStream) { - pContext->num++; - cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); - } else { - cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr); - } - } + cqCreateStream(pContext, pObj); pthread_mutex_unlock(&pContext->mutex); @@ -218,6 +204,26 @@ void cqDrop(void *handle) { pthread_mutex_lock(&pContext->mutex); } +static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { + + if (pContext->dbConn == NULL) { + pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); + if (pContext->dbConn == NULL) { + cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); + } + return; + } + + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); + } +} + static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SCqObj *pObj = (SCqObj *)param; SCqContext *pContext = pObj->pContext; -- GitLab