diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 88c72a91a6ab7a3b6b0a741dcfb2215662c63326..1296caf7600d6ce9417981cfc2b401c641d04d05 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -30,6 +30,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows); static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql); static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer); +static int64_t getLaunchTimeDelay(const SSqlStream* pStream); static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) { return taosGetTimestamp(pStream->precision) + launchDelay - pStream->stime - 1; @@ -178,13 +179,30 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { if (pStream->interval.intervalUnit == 'y' || pStream->interval.intervalUnit == 'n') { timer = 86400 * 1000l; } else { - timer = convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI); + int64_t next_time = 0; + while(1) { + // get next time + next_time = taosTimeAdd(pStream->stime, pStream->interval.sliding, pStream->interval.intervalUnit, TSDB_TIME_PRECISION_MILLI); + next_time = taosTimeTruncate(next_time, &pStream->interval, pStream->precision); + timer = next_time - taosGetTimestamp(pStream->precision); // next time - now() + if(timer < 0 ) { + tscDebug("CQ next time < now so loop add sliding. next_time=%" PRId64, next_time); + continue; + } + + // calc launch delay time + int64_t delay = getLaunchTimeDelay((const SSqlStream*)pStream); + timer += delay; + tscDebug("CQ execute next query after %" PRId64 "ms (delay=%" PRId64 ")", timer, delay); + break; + } } tscSetRetryTimer(pStream, pSql, timer); return; } } + tscDebug("CQ ProcessStreamTimer skey=%" PRId64 " ekey=%" PRId64 " stime=%" PRId64 " endtime=%" PRId64, pQueryInfo->window.skey, pQueryInfo->window.ekey, pStream->stime, pStream->etime); // launch stream computing in a new thread SSchedMsg schedMsg = { 0 }; schedMsg.fp = tscProcessStreamLaunchQuery; @@ -195,7 +213,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { } static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) { - SSqlStream *pStream = (SSqlStream *)param; + SSqlStream *pStream = (SSqlStream *)param; if (tres == NULL || numOfRows < 0) { int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self, @@ -216,7 +234,6 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf pStream->pSql->subState.numOfSub = 0; pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); - tscSetRetryTimer(pStream, pStream->pSql, retryDelay); return; } @@ -479,7 +496,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf if (pSql == NULL || numOfRows < 0) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); tscError("stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 " ms", pStream, numOfRows, retryDelayTime); - tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); return; } @@ -552,7 +568,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf // todo set retry dynamic time int32_t retry = tsProjectExecInterval; tscError("0x%"PRIx64" stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql->self, pStream, numOfRows, retry); - tscSetRetryTimer(pStream, pStream->pSql, retry); return; } @@ -617,28 +632,33 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) taosTmrReset(tscProcessStreamTimer, (int32_t)timer, pStream, tscTmr, &pStream->pTimer); } +// get need delay time for every launch to exeucte query, include first and next launch static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { + // step 1 read setting delay time in taos.cfg int64_t maxDelay = convertTimePrecision(tsMaxStreamComputDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision); - - int64_t delayDelta = maxDelay; + int64_t ratioDelay = maxDelay; if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { - delayDelta = (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio); - if (delayDelta > maxDelay) { - delayDelta = maxDelay; + ratioDelay= (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio); + if (ratioDelay > maxDelay) { + ratioDelay = maxDelay; } - int64_t remainTimeWindow = pStream->interval.sliding - delayDelta; + int64_t remainTimeWindow = pStream->interval.sliding - ratioDelay; if (maxDelay > remainTimeWindow) { maxDelay = (int64_t)(remainTimeWindow / 1.5f); } } - int64_t currentDelay = (rand() % maxDelay); // a random number - currentDelay += delayDelta; + // PART 2 calc allDelay = rand delay + fixed delay + int64_t allDelay = (rand() % maxDelay) + ratioDelay; if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { - assert(currentDelay < pStream->interval.sliding); + if(allDelay >= pStream->interval.sliding) { + tscWarn("CQ delay >= sliding error. delay=%" PRId64 " sliding=%" PRId64 ". so set delay=sliding/2.", allDelay, pStream->interval.sliding); + allDelay = pStream->interval.sliding / 2; + } } - - return currentDelay; + + tscDebug("getLanchDelay allDelay=%" PRId64 "(ratioDelay=%" PRId64 ")", allDelay, ratioDelay); + return allDelay; } @@ -687,7 +707,6 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { timer += getLaunchTimeDelay(pStream); timer = convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI); - tscSetRetryTimer(pStream, pSql, timer); } @@ -779,16 +798,18 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in return stime; } -static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { +static int64_t tscGetFirstLaunchTime(const SSqlStream *pStream) { + // PART 1 now to stime span int64_t timer = 0, now = taosGetTimestamp(pStream->precision); if (pStream->stime > now) { timer = pStream->stime - now; } - int64_t startDelay = convertTimePrecision(tsStreamCompStartDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision); + // PART 2 stream first Launch need delay, setting with taos.cfg + timer += convertTimePrecision(tsFirstLaunchDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision); + // PART 3 every launch need delay, include first and next launch timer += getLaunchTimeDelay(pStream); - timer += startDelay; return convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI); } @@ -834,7 +855,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->stime = pStream->ltime; } - int64_t starttime = tscGetLaunchTimestamp(pStream); + int64_t starttime = tscGetFirstLaunchTime(pStream); pCmd->command = TSDB_SQL_SELECT; tscAddIntoStreamList(pStream); diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 9775a3898fb759873ffe48673062dd8402873041..a0a4b57f8509ca4461d9c7fb91ecd35db6c41e05 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -78,7 +78,7 @@ extern int32_t tsMaxNumOfOrderedResults; extern int32_t tsMinSlidingTime; extern int32_t tsMinIntervalTime; extern int32_t tsMaxStreamComputDelay; -extern int32_t tsStreamCompStartDelay; +extern int32_t tsFirstLaunchDelay; extern int32_t tsRetryStreamCompDelay; extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern int32_t tsProjectExecInterval; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index ce942b60c6c91aa6f0e3773833359ada939c3d16..7a270bb368ae1be7a0aae9ca8d869ac0b09a21ab 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -98,8 +98,8 @@ int32_t tsMinIntervalTime = 1; // 20sec, the maximum value of stream computing delay, changed accordingly int32_t tsMaxStreamComputDelay = 20000; -// 10sec, the first stream computing delay time after system launched successfully, changed accordingly -int32_t tsStreamCompStartDelay = 10000; +// 10sec, the stream first launched to execute delay time after system launched successfully, changed accordingly +int32_t tsFirstLaunchDelay = 10000; // the stream computing delay time after executing failed, change accordingly int32_t tsRetryStreamCompDelay = 10 * 1000; @@ -739,7 +739,7 @@ static void doInitGlobalConfig(void) { taosInitConfigOption(cfg); cfg.option = "maxFirstStreamCompDelay"; - cfg.ptr = &tsStreamCompStartDelay; + cfg.ptr = &tsFirstLaunchDelay; // stream first launch delay time cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 1000;