diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 0e63fa35514ea1b8f237de3031962e70140f4f4b..6e273df38fb9a3889b4d499d2bf2165d16d20f59 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -405,6 +405,7 @@ typedef struct SSqlObj { typedef struct SSqlStream { SSqlObj *pSql; + void * cqhandle; // stream belong to SCQContext handle const char* dstTable; uint32_t streamId; char listed; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index ea2a3d82288318c79fd38f2c3019a70438589ef9..5b2198be622de1b5fbe63d0629aa1bc10e1e72f2 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -19,6 +19,7 @@ #include "ttimer.h" #include "tutil.h" #include "taosmsg.h" +#include "tcq.h" #include "taos.h" @@ -294,24 +295,34 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { return msgLen; } -void tscKillConnection(STscObj *pObj) { - pthread_mutex_lock(&pObj->mutex); +// cqContext->dbconn is killed then call this callback +void cqConnKilledNotify(void* handle, void* conn) { + if (handle == NULL || conn == NULL){ + return ; + } - SSqlObj *pSql = pObj->sqlList; - while (pSql) { - pSql = pSql->next; - } - + SCqContext* pContext = (SCqContext*) handle; + if (pContext->dbConn == conn){ + atomic_store_ptr(&(pContext->dbConn), NULL); + } +} +void tscKillConnection(STscObj *pObj) { + // get stream header by locked + pthread_mutex_lock(&pObj->mutex); SSqlStream *pStream = pObj->streamList; + pthread_mutex_unlock(&pObj->mutex); + while (pStream) { SSqlStream *tmp = pStream->next; + // set associate variant to NULL + cqConnKilledNotify(pStream->cqhandle, pObj); + // taos_close_stream function call pObj->mutet lock , careful death-lock taos_close_stream(pStream); pStream = tmp; } - pthread_mutex_unlock(&pObj->mutex); - tscDebug("connection:%p is killed", pObj); taos_close(pObj); } + diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index b60824b2fa1e0d03d5a80e2e7677967e4c7ba031..ed024e21bacb3a08a3817505a0d29e7f8cbc0fef 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -141,6 +141,10 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { pStream->numOfRes = 0; // reset the numOfRes. SSqlObj *pSql = pStream->pSql; + // pSql == NULL maybe killStream already called + if(pSql == NULL) { + return ; + } SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); tscDebug("0x%"PRIx64" add into timer", pSql->self); @@ -662,7 +666,7 @@ void cbParseSql(void* param, TAOS_RES* res, int code) { } TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), - int64_t stime, void *param, void (*callback)(void *)) { + int64_t stime, void *param, void (*callback)(void *), void* cqhandle) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) return NULL; @@ -695,6 +699,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c pStream->callback = callback; pStream->param = param; pStream->pSql = pSql; + pStream->cqhandle = cqhandle; pSql->pStream = pStream; pSql->param = pStream; pSql->maxRetry = TSDB_MAX_REPLICA; @@ -735,7 +740,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { - return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback); + return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL); } void taos_close_stream(TAOS_STREAM *handle) { diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index ee4be02b9004b56c0139b792393d00d9ea6a6f98..f539e7725315d2358767624ce74a8e9609a0b425 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -38,21 +38,6 @@ #define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }} #define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }} -typedef struct { - int32_t vgId; - int32_t master; - int32_t num; // number of continuous streams - char user[TSDB_USER_LEN]; - char pass[TSDB_KEY_LEN]; - char db[TSDB_DB_NAME_LEN]; - FCqWrite cqWrite; - struct SCqObj *pHead; - void *dbConn; - void *tmrCtrl; - pthread_mutex_t mutex; - int32_t delete; - int32_t cqObjNum; -} SCqContext; typedef struct SCqObj { tmr_h tmrId; @@ -439,7 +424,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { // inner implement in tscStream.c TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), - int64_t stime, void *param, void (*callback)(void *)); + int64_t stime, void *param, void (*callback)(void *), void* cqhandle); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->pContext = pContext; @@ -453,7 +438,8 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->tmrId = 0; if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); + pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \ + INT64_MIN, (void *)pObj->rid, NULL, pContext); // TODO the pObj->pStream may be released if error happens if (pObj->pStream) { diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 552a40665ae10c72203ac60afaec59a4381ead5f..7549c3d498a02e948212c38b071701aa55adc2fc 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -31,6 +31,23 @@ typedef struct { FCqWrite cqWrite; } SCqCfg; +// SCqContext +typedef struct { + int32_t vgId; + int32_t master; + int32_t num; // number of continuous streams + char user[TSDB_USER_LEN]; + char pass[TSDB_KEY_LEN]; + char db[TSDB_DB_NAME_LEN]; + FCqWrite cqWrite; + struct SCqObj *pHead; + void *dbConn; + void *tmrCtrl; + pthread_mutex_t mutex; + int32_t delete; + int32_t cqObjNum; +} SCqContext; + // the following API shall be called by vnode void *cqOpen(void *ahandle, const SCqCfg *pCfg); void cqClose(void *handle);