diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 9094f95dfc32a3e3fcf33c5cb17b290faf6196a6..7e6132b7c84d5401d67c80a26933f7073d1613e5 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -25,6 +25,7 @@ #include "tutil.h" #include "tscProfile.h" +#include "tscSubquery.h" static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows); @@ -538,31 +539,7 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } -///* -// -// get tableName last row time, if have error return zero. -// -static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) { - int64_t last_time = 0; - char sql[128] = ""; - sprintf(sql, "select last_row(*) from %s;", tableName); - - // query sql - TAOS_RES* res = taos_query(pSql->pTscObj, sql); - if(res == NULL) - return 0; - - // only fetch one row - TAOS_ROW row = taos_fetch_row(res); - if( row && row[0] ) { - last_time = *((int64_t*)row[0]); - } - // free and return - taos_free_result(res); - return last_time; -} -//*/ static void tscCreateStream(void *param, TAOS_RES *res, int code) { SSqlStream* pStream = (SSqlStream*)param; SSqlObj* pSql = pStream->pSql; @@ -596,15 +573,12 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); - // set output table last record time to stime if have, why do this, because continue with last brea + // set stime with ltime if ltime > stime const char* dstTable = pStream->dstTable? pStream->dstTable: ""; - int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, dstTable); - pStream->ltime = last_time; - tscDebug(" CQ get table=%s lasttime=%"PRId64" end.", dstTable, last_time); - if(last_time > 0 && last_time > pStream->stime) { - // can replace stime with last row time - tscDebug(" CQ set table %s stime=%"PRId64" with lasttime=%"PRId64" ", dstTable, pStream->stime, last_time); - pStream->stime = last_time; + tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime); + if(pStream->ltime > 0 && pStream->ltime > pStream->stime) { + tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" ", dstTable, pStream->stime, pStream->ltime); + pStream->stime = pStream->ltime; } int64_t starttime = tscGetLaunchTimestamp(pStream); @@ -622,25 +596,66 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) { pStream->dstTable = dstTable; } -// already run on another thread -void tscCreateStreamThread(SSchedMsg* pMsg) { - tscDebug(" new thread Sched call tscCreateStream begin..."); - tscCreateStream(pMsg->ahandle, NULL, 0); - tscDebug(" new thread Sched call tscCreateStream end."); - return ; +// fetchFp call back +void fetchFpStreamLastRow(void* param ,TAOS_RES* res, int num) { + SSqlStream* pStream = (SSqlStream*)param; + SSqlObj* pSql = res; + + // get row data set to ltime + tscSetSqlOwner(pSql); + TAOS_ROW row = doSetResultRowData(pSql); + if( row && row[0] ) { + pStream->ltime = *((int64_t*)row[0]); + const char* dstTable = pStream->dstTable? pStream->dstTable: ""; + tscDebug(" CQ stream table=%s last row time=%"PRId64" .", dstTable, pStream->ltime); + } + tscClearSqlOwner(pSql); + + // no condition call + tscCreateStream(param, pStream->pSql, TSDB_CODE_SUCCESS); + taos_free_result(res); } -// parsesql async response return and change run thread -void tsParseSqlRet(void* param, TAOS_RES* res, int code) { - SSchedMsg schedMsg = { 0 }; - schedMsg.fp = tscCreateStreamThread; - schedMsg.ahandle = param; - schedMsg.thandle = res; - schedMsg.msg = NULL; - taosScheduleTask(tscQhandle, &schedMsg); +// fp callback +void fpStreamLastRow(void* param ,TAOS_RES* res, int code) { + // check result successful + if (code != TSDB_CODE_SUCCESS) { + tscCreateStream(param, res, TSDB_CODE_SUCCESS); + taos_free_result(res); + return ; + } + + // asynchronous fetch last row data + taos_fetch_rows_a(res, fetchFpStreamLastRow, param); } -TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), +void cbParseSql(void* param, TAOS_RES* res, int code) { + // check result successful + SSqlStream* pStream = (SSqlStream*)param; + SSqlObj* pSql = pStream->pSql; + SSqlCmd* pCmd = &pSql->cmd; + if (code != TSDB_CODE_SUCCESS) { + pSql->res.code = code; + tscDebug("0x%"PRIx64" open stream parse sql failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code)); + pStream->fp(pStream->param, NULL, NULL); + return; + } + + // check dstTable valid + if(pStream->dstTable == NULL || strlen(pStream->dstTable) == 0) { + tscDebug(" cbParseSql dstTable is empty."); + tscCreateStream(param, res, code); + return ; + } + + // query stream last row time async + char sql[128] = ""; + sprintf(sql, "select last_row(*) from %s;", pStream->dstTable); + taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param); + return ; +} + +TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) return NULL; @@ -671,6 +686,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p pSql->pStream = pStream; pSql->param = pStream; pSql->maxRetry = TSDB_MAX_REPLICA; + tscSetStreamDestTable(pStream, dstTable); pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); if (pSql->sqlstr == NULL) { @@ -685,16 +701,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tsem_init(&pSql->rspSem, 0, 0); - pSql->fp = tsParseSqlRet; - pSql->fetchFp = tsParseSqlRet; + pSql->fp = cbParseSql; + pSql->fetchFp = cbParseSql; registerSqlObj(pSql); - + int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_SUCCESS) { - tscCreateStream(pStream, pSql, code); + cbParseSql(pStream, pSql, code); } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - tscDebug(" cq parseSql IN Process pass. "); + tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr); } else { tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code)); taosReleaseRef(tscObjRef, pSql->self); @@ -705,6 +721,11 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return pStream; } +TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), + int64_t stime, void *param, void (*callback)(void *)) { + return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback); +} + void taos_close_stream(TAOS_STREAM *handle) { SSqlStream *pStream = (SSqlStream *)handle; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 5d5d5f339eec62db09f4c93acad0cea096c0962d..ee4be02b9004b56c0139b792393d00d9ea6a6f98 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -437,6 +437,10 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { taosReleaseRef(cqObjRef, (int64_t)param); } +// inner implement in tscStream.c +TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), + int64_t stime, void *param, void (*callback)(void *)); + static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->pContext = pContext; @@ -449,11 +453,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->tmrId = 0; if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); + pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); // TODO the pObj->pStream may be released if error happens if (pObj->pStream) { - tscSetStreamDestTable(pObj->pStream, pObj->dstTable); pContext->num++; cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr); } else {