diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4bfd3bc88ff05c592d98fad4a0499c53894eb43a..0e63fa35514ea1b8f237de3031962e70140f4f4b 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -422,6 +422,7 @@ typedef struct SSqlStream { int64_t ctime; // stream created time int64_t stime; // stream next executed time int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed + int64_t ltime; // stream last row time in stream table SInterval interval; void * pTimer; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 0f6a403582f34285431e0b407a04d1be8f894be8..9094f95dfc32a3e3fcf33c5cb17b290faf6196a6 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -538,12 +538,11 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } - +///* // // get tableName last row time, if have error return zero. // static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) { - int64_t last_time = 0; char sql[128] = ""; sprintf(sql, "select last_row(*) from %s;", tableName); @@ -555,7 +554,7 @@ static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, con // only fetch one row TAOS_ROW row = taos_fetch_row(res); - if( row[0] ) { + if( row && row[0] ) { last_time = *((int64_t*)row[0]); } @@ -563,7 +562,7 @@ static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, con taos_free_result(res); return last_time; } - +//*/ static void tscCreateStream(void *param, TAOS_RES *res, int code) { SSqlStream* pStream = (SSqlStream*)param; SSqlObj* pSql = pStream->pSql; @@ -597,10 +596,14 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); - // set output table last record time to stime if have, why do this, because continue with last break - int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, pStream->dstTable); + // set output table last record time to stime if have, why do this, because continue with last brea + const char* dstTable = pStream->dstTable? pStream->dstTable: ""; + int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, dstTable); + pStream->ltime = last_time; + tscDebug(" CQ get table=%s lasttime=%"PRId64" end.", dstTable, last_time); if(last_time > 0 && last_time > pStream->stime) { // can replace stime with last row time + tscDebug(" CQ set table %s stime=%"PRId64" with lasttime=%"PRId64" ", dstTable, pStream->stime, last_time); pStream->stime = last_time; } @@ -619,6 +622,24 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) { pStream->dstTable = dstTable; } +// already run on another thread +void tscCreateStreamThread(SSchedMsg* pMsg) { + tscDebug(" new thread Sched call tscCreateStream begin..."); + tscCreateStream(pMsg->ahandle, NULL, 0); + tscDebug(" new thread Sched call tscCreateStream end."); + return ; +} + +// parsesql async response return and change run thread +void tsParseSqlRet(void* param, TAOS_RES* res, int code) { + SSchedMsg schedMsg = { 0 }; + schedMsg.fp = tscCreateStreamThread; + schedMsg.ahandle = param; + schedMsg.thandle = res; + schedMsg.msg = NULL; + taosScheduleTask(tscQhandle, &schedMsg); +} + TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; @@ -664,15 +685,17 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tsem_init(&pSql->rspSem, 0, 0); - pSql->fp = tscCreateStream; - pSql->fetchFp = tscCreateStream; + pSql->fp = tsParseSqlRet; + pSql->fetchFp = tsParseSqlRet; registerSqlObj(pSql); int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_SUCCESS) { tscCreateStream(pStream, pSql, code); - } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + tscDebug(" cq parseSql IN Process pass. "); + } else { tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code)); taosReleaseRef(tscObjRef, pSql->self); free(pStream);