diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 8c194c031d81d899af22b0be90be87072d8a8b51..e38020669631376cf111fbb543ec3c6deb400f41 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -564,8 +564,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); if (!pSql->cmd.parseFinished) { tsParseSql(pSql, false); - sem_post(&pSql->rspSem); } + (*pSql->fp)(pSql->param, pSql, code); return; } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 6cc27a4cfe0991072bee92b7e831fb0736923b38..7c188ec96975f28e1bc5d692ed4e2c98ac505afb 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -70,6 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { SSqlObj * pSql = pStream->pSql; pSql->fp = tscProcessStreamQueryCallback; + pSql->fetchFp = tscProcessStreamQueryCallback; pSql->param = pStream; pSql->res.completed = false; @@ -471,6 +472,41 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) { } } +static void tscCreateStream(void *param, TAOS_RES *res, int code) { + SSqlStream* pStream = (SSqlStream*)param; + SSqlObj* pSql = pStream->pSql; + SSqlCmd* pCmd = &pSql->cmd; + + if (code != TSDB_CODE_SUCCESS) { + setErrorInfo(pSql, code, pCmd->payload); + tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, pSql->sqlstr, pCmd->payload, code); + pStream->fp(pStream->param, NULL, NULL); + return; + } + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + + pStream->isProject = isProjectStream(pQueryInfo); + pStream->precision = tinfo.precision; + + pStream->ctime = taosGetTimestamp(pStream->precision); + pStream->etime = pQueryInfo->window.ekey; + + tscAddIntoStreamList(pStream); + + tscSetSlidingWindowInfo(pSql, pStream); + pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); + + int64_t starttime = tscGetLaunchTimestamp(pStream); + pCmd->command = TSDB_SQL_SELECT; + taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); + + tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, + pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, pSql->sqlstr); +} + TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; @@ -482,7 +518,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } pSql->signature = pSql; - pSql->param = pSql; pSql->pTscObj = pObj; SSqlCmd *pCmd = &pSql->cmd; @@ -494,7 +529,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscFreeSqlObj(pSql); return NULL; } + + pStream->stime = stime; + pStream->fp = fp; + pStream->callback = callback; + pStream->param = param; + pStream->pSql = pSql; pSql->pStream = pStream; + pSql->param = pStream; pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); if (pSql->sqlstr == NULL) { @@ -507,45 +549,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tsem_init(&pSql->rspSem, 0, 0); + pSql->fp = tscCreateStream; + pSql->fetchFp = tscCreateStream; int32_t code = tsParseSql(pSql, true); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - sem_wait(&pSql->rspSem); - } - - if (pRes->code != TSDB_CODE_SUCCESS) { - setErrorInfo(pSql, pRes->code, pCmd->payload); - - tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code); + if (code == TSDB_CODE_SUCCESS) { + tscCreateStream(pStream, pSql, code); + } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(pRes->code)); tscFreeSqlObj(pSql); + free(pStream); return NULL; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - - pStream->isProject = isProjectStream(pQueryInfo); - pStream->fp = fp; - pStream->callback = callback; - pStream->param = param; - pStream->pSql = pSql; - pStream->precision = tinfo.precision; - - pStream->ctime = taosGetTimestamp(pStream->precision); - pStream->etime = pQueryInfo->window.ekey; - - tscAddIntoStreamList(pStream); - - tscSetSlidingWindowInfo(pSql, pStream); - pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime); - - int64_t starttime = tscGetLaunchTimestamp(pStream); - pCmd->command = TSDB_SQL_SELECT; - taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); - - tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, - pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, sqlstr); - return pStream; } diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 04d3a6fd6dd40382f7e9bc03820e64df59a04950..51cd471a6b66526228b85be1c8dbc6f033d2515f 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -23,6 +23,7 @@ #include "taos.h" #include "taosdef.h" #include "taosmsg.h" +#include "ttimer.h" #include "tcq.h" #include "tdataformat.h" #include "tglobal.h" @@ -45,10 +46,12 @@ typedef struct { struct SCqObj *pHead; void *dbConn; int master; + void *tmrCtrl; pthread_mutex_t mutex; } SCqContext; typedef struct SCqObj { + tmr_h tmrId; uint64_t uid; int32_t tid; // table ID int rowSize; // bytes of a row @@ -66,13 +69,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); void *cqOpen(void *ahandle, const SCqCfg *pCfg) { - SCqContext *pContext = calloc(sizeof(SCqContext), 1); if (pContext == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } + pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ"); + tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user)); tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass)); const char* db = pCfg->db; @@ -99,6 +103,9 @@ void cqClose(void *handle) { SCqContext *pContext = handle; if (handle == NULL) return; + taosTmrCleanUp(pContext->tmrCtrl); + pContext->tmrCtrl = NULL; + // stop all CQs cqStop(pContext); @@ -154,8 +161,10 @@ void cqStop(void *handle) { taos_close_stream(pObj->pStream); pObj->pStream = NULL; cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + taosTmrStop(pObj->tmrId); + pObj->tmrId = 0; } - pObj = pObj->next; } @@ -211,8 +220,13 @@ void cqDrop(void *handle) { } // free the resources associated - if (pObj->pStream) taos_close_stream(pObj->pStream); - pObj->pStream = NULL; + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + } else { + taosTmrStop(pObj->tmrId); + pObj->tmrId = 0; + } cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); tdFreeSchema(pObj->pSchema); @@ -222,18 +236,30 @@ void cqDrop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { +static void cqProcessCreateTimer(void *param, void *tmrId) { + SCqObj* pObj = (SCqObj*)param; + SCqContext* pContext = pObj->pContext; + if (pContext->dbConn == NULL) { pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); if (pContext->dbConn == NULL) { cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); - return; } } + + cqCreateStream(pContext, pObj); +} - int64_t lastKey = 0; +static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->pContext = pContext; - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + + if (pContext->dbConn == NULL) { + pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl); + return; + } + pObj->tmrId = 0; + + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); if (pObj->pStream) { pContext->num++; cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index 6a6da0a2a52844e4a2c42bad64658c93b48fffae..8618de324446ca3f2504d15a6422bcca3a4b51b0 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -22,7 +22,6 @@ extern "C" { int32_t dnodeInitModules(); void dnodeStartModules(); -void dnodeStartStream(); void dnodeCleanupModules(); void dnodeProcessModuleStatus(uint32_t moduleStatus); diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 987a1899597f495730a391e6bdbde7fb862731c3..6476bb78314f76ae02cbaae0dd04547bc7565313 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -123,7 +123,6 @@ int32_t dnodeInitSystem() { dnodeStartModules(); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); - dnodeStartStream(); dInfo("TDengine is initialized successfully"); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 338fcd287cefe8813e29ff8da7f40a60cf59fad5..bcaff255f769b7833648081bd6c3e05ff888e94a 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -354,23 +354,6 @@ static int32_t dnodeOpenVnodes() { return TSDB_CODE_SUCCESS; } -void dnodeStartStream() { - int32_t vnodeList[TSDB_MAX_VNODES] = {0}; - int32_t numOfVnodes = 0; - int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes); - - if (status != TSDB_CODE_SUCCESS) { - dInfo("get dnode list failed"); - return; - } - - for (int32_t i = 0; i < numOfVnodes; ++i) { - vnodeStartStream(vnodeList[i]); - } - - dInfo("streams started"); -} - static void dnodeCloseVnodes() { int32_t vnodeList[TSDB_MAX_VNODES]= {0}; int32_t numOfVnodes = 0; diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index b8cc1768e85376182a8a57adbc5d159485151048..a1e87a74374e9fb514141aaad937afba8fdf52e1 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -117,7 +117,6 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); -void tsdbStartStream(TSDB_REPO_T *repo); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 1e6cfa97006e110f5ce1e36073f165540adeda26..77c72c24516e17fcac81d648a10061537b38249f 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -44,7 +44,6 @@ typedef struct { int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId, char *rootDir); -int32_t vnodeStartStream(int32_t vgId); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 6b3160070514b13d8b3b961854346d98d2b400a3..ffaab375a3bfc305cbedecae6df8881e28772aa2 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -69,6 +69,8 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); +static void tsdbStartStream(STsdbRepo *pRepo); +static void tsdbStopStream(STsdbRepo *pRepo); // Function declaration int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { @@ -127,6 +129,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { goto _err; } + tsdbStartStream(pRepo); // pRepo->state = TSDB_REPO_STATE_ACTIVE; tsdbDebug("vgId:%d open tsdb repository succeed!", REPO_ID(pRepo)); @@ -145,6 +148,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { STsdbRepo *pRepo = (STsdbRepo *)repo; int vgId = REPO_ID(pRepo); + tsdbStopStream(repo); + if (toCommit) { tsdbAsyncCommit(pRepo); if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); @@ -265,19 +270,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ return magic; } -void tsdbStartStream(TSDB_REPO_T *repo) { - STsdbRepo *pRepo = (STsdbRepo *)repo; - STsdbMeta *pMeta = pRepo->tsdbMeta; - - for (int i = 0; i < pRepo->config.maxTables; i++) { - STable *pTable = pMeta->tables[i]; - if (pTable && pTable->type == TSDB_STREAM_TABLE) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, - tsdbGetTableSchemaImpl(pTable, false, false, -1)); - } - } -} - STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { ASSERT(repo != NULL); return &((STsdbRepo *)repo)->config; @@ -1120,4 +1112,27 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) { return TSDB_GET_TABLE_LAST_KEY(pTable); } -#endif \ No newline at end of file +#endif + +static void tsdbStartStream(STsdbRepo *pRepo) { + STsdbMeta *pMeta = pRepo->tsdbMeta; + + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->type == TSDB_STREAM_TABLE) { + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, + tsdbGetTableSchemaImpl(pTable, false, false, -1)); + } + } +} + +static void tsdbStopStream(STsdbRepo *pRepo) { + STsdbMeta *pMeta = pRepo->tsdbMeta; + + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->type == TSDB_STREAM_TABLE) { + (*pRepo->appH.cqDropFunc)(pTable->cqhandle); + } + } +} diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 49f648b74db924f22249d3264b93e6701cf0068a..539f9c68511a96369a707265bb1a7dad462f44d2 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -303,10 +303,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } #endif - // start continuous query - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) - cqStart(pVnode->cq); - pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); pVnode->events = NULL; pVnode->status = TAOS_VN_STATUS_READY; @@ -317,15 +313,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { return TSDB_CODE_SUCCESS; } -int32_t vnodeStartStream(int32_t vnode) { - SVnodeObj* pVnode = vnodeAcquireVnode(vnode); - if (pVnode != NULL) { - tsdbStartStream(pVnode->tsdb); - vnodeRelease(pVnode); - } - return TSDB_CODE_SUCCESS; -} - int32_t vnodeClose(int32_t vgId) { SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); if (ppVnode == NULL || *ppVnode == NULL) return 0;