未验证 提交 e52b2973 编写于 作者: D DuanKuanJun 提交者: GitHub

Merge pull request #6346 from taosdata/cq

CQ Fix QueryInfo->window.ekey not align sliding value when pStream->stime == INT64_MIN 
...@@ -111,7 +111,9 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { ...@@ -111,7 +111,9 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
// failed to get table Meta or vgroup list, retry in 10sec. // failed to get table Meta or vgroup list, retry in 10sec.
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tscTansformFuncForSTableQuery(pQueryInfo); tscTansformFuncForSTableQuery(pQueryInfo);
tscDebug("0x%"PRIx64" stream:%p, start stream query on:%s", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name));
tscDebug("0x%"PRIx64" stream:%p, start stream query on:%s QueryInfo->skey=%"PRId64" ekey=%"PRId64" ", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name), pQueryInfo->window.skey, pQueryInfo->window.ekey);
pQueryInfo->command = TSDB_SQL_SELECT; pQueryInfo->command = TSDB_SQL_SELECT;
...@@ -165,7 +167,11 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -165,7 +167,11 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if (etime > pStream->etime) { if (etime > pStream->etime) {
etime = pStream->etime; etime = pStream->etime;
} else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') { } else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') {
etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval; if(pStream->stime == INT64_MIN) {
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
} else {
etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval;
}
} else { } else {
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision); etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
} }
...@@ -349,8 +355,8 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) ...@@ -349,8 +355,8 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream, tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
now + timer, timer, delay, pStream->stime, etime); now + timer, timer, delay, pStream->stime, etime);
} else { } else {
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream, tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 " - %" PRId64 " end, in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
pStream->stime, timer, delay, pStream->stime - pStream->interval.interval, pStream->stime - 1); pStream->stime, pStream->etime, timer, delay, pStream->stime - pStream->interval.interval, pStream->stime - 1);
} }
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
...@@ -660,6 +666,11 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c ...@@ -660,6 +666,11 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL; if (pObj == NULL || pObj->signature != pObj) return NULL;
if(fp == NULL){
tscError(" taos_open_stream api fp param must not NULL.");
return NULL;
}
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { if (pSql == NULL) {
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册