diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 0401d1f3b27e590f4642644023a44a189ebdcc59..b60824b2fa1e0d03d5a80e2e7677967e4c7ba031 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -111,7 +111,9 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { // failed to get table Meta or vgroup list, retry in 10sec. if (code == TSDB_CODE_SUCCESS) { tscTansformFuncForSTableQuery(pQueryInfo); - tscDebug("0x%"PRIx64" stream:%p, start stream query on:%s", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name)); + + + tscDebug("0x%"PRIx64" stream:%p, start stream query on:%s QueryInfo->skey=%"PRId64" ekey=%"PRId64" ", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name), pQueryInfo->window.skey, pQueryInfo->window.ekey); pQueryInfo->command = TSDB_SQL_SELECT; @@ -165,7 +167,11 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { if (etime > pStream->etime) { etime = pStream->etime; } else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') { - etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval; + if(pStream->stime == INT64_MIN) { + etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision); + } else { + etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval; + } } else { etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision); } @@ -349,8 +355,8 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream, now + timer, timer, delay, pStream->stime, etime); } else { - tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream, - pStream->stime, timer, delay, pStream->stime - pStream->interval.interval, pStream->stime - 1); + tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 " - %" PRId64 " end, in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream, + pStream->stime, pStream->etime, timer, delay, pStream->stime - pStream->interval.interval, pStream->stime - 1); } pSql->cmd.command = TSDB_SQL_SELECT; @@ -660,6 +666,11 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) return NULL; + if(fp == NULL){ + tscError(" taos_open_stream api fp param must not NULL."); + return NULL; + } + SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); if (pSql == NULL) { return NULL;