From 261e50e23e5bc34723032c4f96e8908d24bb69ce Mon Sep 17 00:00:00 2001 From: tickduan <417921451@qq.com> Date: Sat, 22 May 2021 15:11:45 +0800 Subject: [PATCH] cq can continue with output table last row time --- src/client/src/tscStream.c | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index f0f87f26db..267fa0c0fe 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -539,6 +539,31 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } +// +// get tableName last row time, if have error return zero. +// +static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) { + + int64_t last_time = 0; + char sql[128] = ""; + sprintf(sql, "select last_row(*) from %s;", tableName); + + // query sql + TAOS_RES* res = taos_query(pSql->pTscObj, sql); + if(res == NULL) + return 0; + + // only fetch one row + TAOS_ROW row = taos_fetch_row(res); + if( row[0] ) { + last_time = *((int64_t*)row[0]); + } + + // free and return + taos_free_result(res); + return last_time; +} + static void tscCreateStream(void *param, TAOS_RES *res, int code) { SSqlStream* pStream = (SSqlStream*)param; SSqlObj* pSql = pStream->pSql; @@ -572,6 +597,13 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); + // set output table last record time to stime if have, why do this, because continue with last break + int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, pStream->dstTable); + if(last_time > 0 && last_time > pStream->stime) { + // can replace stime with last row time + pStream->stime = last_time; + } + int64_t starttime = tscGetLaunchTimestamp(pStream); pCmd->command = TSDB_SQL_SELECT; -- GitLab