提交 f3bf2f9b 编写于 作者: A Alex Duan

fixed delay time bugs

上级 6af088ad
......@@ -30,6 +30,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows);
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql);
static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer);
static int64_t getLaunchTimeDelay(const SSqlStream* pStream);
static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) {
return taosGetTimestamp(pStream->precision) + launchDelay - pStream->stime - 1;
......@@ -178,13 +179,30 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if (pStream->interval.intervalUnit == 'y' || pStream->interval.intervalUnit == 'n') {
timer = 86400 * 1000l;
} else {
timer = convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI);
int64_t next_time = 0;
while(1) {
// get next time
next_time = taosTimeAdd(pStream->stime, pStream->interval.sliding, pStream->interval.intervalUnit, TSDB_TIME_PRECISION_MILLI);
next_time = taosTimeTruncate(next_time, &pStream->interval, pStream->precision);
timer = next_time - taosGetTimestamp(pStream->precision); // next time - now()
if(timer < 0 ) {
tscDebug("CQ next time < now so loop add sliding. next_time=%" PRId64, next_time);
continue;
}
// calc launch delay time
int64_t delay = getLaunchTimeDelay((const SSqlStream*)pStream);
timer += delay;
tscDebug("CQ execute next query after %" PRId64 "ms (delay=%" PRId64 ")", timer, delay);
break;
}
}
tscSetRetryTimer(pStream, pSql, timer);
return;
}
}
tscDebug("CQ ProcessStreamTimer skey=%" PRId64 " ekey=%" PRId64 " stime=%" PRId64 " endtime=%" PRId64, pQueryInfo->window.skey, pQueryInfo->window.ekey, pStream->stime, pStream->etime);
// launch stream computing in a new thread
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessStreamLaunchQuery;
......@@ -216,7 +234,6 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
pStream->pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return;
}
......@@ -479,7 +496,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscError("stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 " ms", pStream, numOfRows, retryDelayTime);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
return;
}
......@@ -552,7 +568,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
// todo set retry dynamic time
int32_t retry = tsProjectExecInterval;
tscError("0x%"PRIx64" stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql->self, pStream, numOfRows, retry);
tscSetRetryTimer(pStream, pStream->pSql, retry);
return;
}
......@@ -617,28 +632,33 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
taosTmrReset(tscProcessStreamTimer, (int32_t)timer, pStream, tscTmr, &pStream->pTimer);
}
// get need delay time for every launch to exeucte query, include first and next launch
static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
// step 1 read setting delay time in taos.cfg
int64_t maxDelay = convertTimePrecision(tsMaxStreamComputDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
int64_t delayDelta = maxDelay;
int64_t ratioDelay = maxDelay;
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
delayDelta = (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio);
if (delayDelta > maxDelay) {
delayDelta = maxDelay;
ratioDelay= (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio);
if (ratioDelay > maxDelay) {
ratioDelay = maxDelay;
}
int64_t remainTimeWindow = pStream->interval.sliding - delayDelta;
int64_t remainTimeWindow = pStream->interval.sliding - ratioDelay;
if (maxDelay > remainTimeWindow) {
maxDelay = (int64_t)(remainTimeWindow / 1.5f);
}
}
int64_t currentDelay = (rand() % maxDelay); // a random number
currentDelay += delayDelta;
// PART 2 calc allDelay = rand delay + fixed delay
int64_t allDelay = (rand() % maxDelay) + ratioDelay;
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
assert(currentDelay < pStream->interval.sliding);
if(allDelay >= pStream->interval.sliding) {
tscWarn("CQ delay >= sliding error. delay=%" PRId64 " sliding=%" PRId64 ". so set delay=sliding/2.", allDelay, pStream->interval.sliding);
allDelay = pStream->interval.sliding / 2;
}
}
return currentDelay;
tscDebug("getLanchDelay allDelay=%" PRId64 "(ratioDelay=%" PRId64 ")", allDelay, ratioDelay);
return allDelay;
}
......@@ -687,7 +707,6 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
timer += getLaunchTimeDelay(pStream);
timer = convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI);
tscSetRetryTimer(pStream, pSql, timer);
}
......@@ -779,16 +798,18 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
return stime;
}
static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
static int64_t tscGetFirstLaunchTime(const SSqlStream *pStream) {
// PART 1 now to stime span
int64_t timer = 0, now = taosGetTimestamp(pStream->precision);
if (pStream->stime > now) {
timer = pStream->stime - now;
}
int64_t startDelay = convertTimePrecision(tsStreamCompStartDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
// PART 2 stream first Launch need delay, setting with taos.cfg
timer += convertTimePrecision(tsFirstLaunchDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
// PART 3 every launch need delay, include first and next launch
timer += getLaunchTimeDelay(pStream);
timer += startDelay;
return convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI);
}
......@@ -834,7 +855,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream->stime = pStream->ltime;
}
int64_t starttime = tscGetLaunchTimestamp(pStream);
int64_t starttime = tscGetFirstLaunchTime(pStream);
pCmd->command = TSDB_SQL_SELECT;
tscAddIntoStreamList(pStream);
......
......@@ -78,7 +78,7 @@ extern int32_t tsMaxNumOfOrderedResults;
extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay;
extern int32_t tsFirstLaunchDelay;
extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int32_t tsProjectExecInterval;
......
......@@ -98,8 +98,8 @@ int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly
int32_t tsStreamCompStartDelay = 10000;
// 10sec, the stream first launched to execute delay time after system launched successfully, changed accordingly
int32_t tsFirstLaunchDelay = 10000;
// the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10 * 1000;
......@@ -739,7 +739,7 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption(cfg);
cfg.option = "maxFirstStreamCompDelay";
cfg.ptr = &tsStreamCompStartDelay;
cfg.ptr = &tsFirstLaunchDelay; // stream first launch delay time
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1000;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册