diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index f0f87f26db3990a5546a6dc7be75dfae68fa88a1..267fa0c0fe1d084402d4dd1363bfd543c3fa7837 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -539,6 +539,31 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } +// +// get tableName last row time, if have error return zero. +// +static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) { + + int64_t last_time = 0; + char sql[128] = ""; + sprintf(sql, "select last_row(*) from %s;", tableName); + + // query sql + TAOS_RES* res = taos_query(pSql->pTscObj, sql); + if(res == NULL) + return 0; + + // only fetch one row + TAOS_ROW row = taos_fetch_row(res); + if( row[0] ) { + last_time = *((int64_t*)row[0]); + } + + // free and return + taos_free_result(res); + return last_time; +} + static void tscCreateStream(void *param, TAOS_RES *res, int code) { SSqlStream* pStream = (SSqlStream*)param; SSqlObj* pSql = pStream->pSql; @@ -572,6 +597,13 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); + // set output table last record time to stime if have, why do this, because continue with last break + int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, pStream->dstTable); + if(last_time > 0 && last_time > pStream->stime) { + // can replace stime with last row time + pStream->stime = last_time; + } + int64_t starttime = tscGetLaunchTimestamp(pStream); pCmd->command = TSDB_SQL_SELECT;