From 33367d7033a0191538ea99f329c10e44c3804240 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 10 Jan 2020 17:46:25 +0800 Subject: [PATCH] [tbase-1428] --- packaging/cfg/taos.cfg | 6 +++ src/client/src/tscStream.c | 80 +++++++++++++++++++++----------------- src/inc/tglobalcfg.h | 1 + src/util/src/tglobalcfg.c | 44 ++++++++++++++------- 4 files changed, 80 insertions(+), 51 deletions(-) diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index 1e2fb4fed4..588f0650bd 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -211,6 +211,12 @@ # whether to enable HTTP compression transmission # httpEnableCompress 0 +# the delayed time for launching each continuous query. 10% of the whole computing time window by default. +# streamCompDelayRatio 0.1 + +# the max allowed delayed time for launching continuous query. 20ms by default +# tsMaxStreamComputDelay 20000 + # whether the telegraf table name contains the number of tags and the number of fields # telegrafUseFieldNum 0 diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 69e3ea1eb3..f07c049ff6 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -31,6 +31,10 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql); static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer); +static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) { + return taosGetTimestamp(pStream->precision) + launchDelay - pStream->stime - 1; +} + static bool isProjectStream(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); @@ -88,7 +92,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { if (code != TSDB_CODE_SUCCESS) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); - + tscSetRetryTimer(pStream, pSql, retryDelayTime); return; } @@ -144,7 +148,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pStream->pSql->cmd, 0, 0); tscClearMeterMetaInfo(pMeterMetaInfo, true); - + tscSetRetryTimer(pStream, pStream->pSql, retryDelay); return; } @@ -174,7 +178,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); tscClearMeterMetaInfo(pMeterMetaInfo, true); - + tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); return; } @@ -243,6 +247,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retry); tscClearSqlMetaInfoForce(&(pStream->pSql->cmd)); + tscSetRetryTimer(pStream, pStream->pSql, retry); return; } @@ -263,6 +268,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + int64_t delay = getDelayValueAfterTimewindowClosed(pStream, timer); if (isProjectStream(pQueryInfo)) { int64_t now = taosGetTimestamp(pStream->precision); @@ -282,12 +288,12 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) } return; } - - tscTrace("%p stream:%p, next query start at %" PRId64 ", in %" PRId64 "ms. query range %" PRId64 "-%" PRId64 "", pStream->pSql, pStream, - now + timer, timer, pStream->stime, etime); + + tscTrace("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64 "", pStream->pSql, pStream, + now + timer, timer, delay, pStream->stime, etime); } else { - tscTrace("%p stream:%p, next query start at %" PRId64 ", in %" PRId64 "ms. query range %" PRId64 "-%" PRId64 "", pStream->pSql, pStream, - pStream->stime, timer, pStream->stime - pStream->interval, pStream->stime - 1); + tscTrace("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64 "", pStream->pSql, pStream, + pStream->stime, timer, delay, pStream->stime - pStream->interval, pStream->stime - 1); } pSql->cmd.command = TSDB_SQL_SELECT; @@ -296,6 +302,29 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) taosTmrReset(tscProcessStreamTimer, timer, pStream, tscTmr, &pStream->pTimer); } +static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { + int64_t delayDelta = (int64_t)(pStream->slidingTime * tsStreamComputDelayRatio); + + int64_t maxDelay = + (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay; + + if (delayDelta > maxDelay) { + delayDelta = maxDelay; + } + + int64_t remainTimeWindow = pStream->slidingTime - delayDelta; + if (maxDelay > remainTimeWindow) { + maxDelay = (remainTimeWindow / 1.5); + } + + int64_t currentDelay = (rand() % maxDelay); // a random number + currentDelay += delayDelta; + assert(currentDelay < pStream->slidingTime); + + return currentDelay; +} + + static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { int64_t timer = 0; @@ -330,24 +359,15 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { } return; } - + timer = pStream->stime - taosGetTimestamp(pStream->precision); if (timer < 0) { timer = 0; } } - int64_t delayDelta = (int64_t)(pStream->slidingTime * 0.1); - delayDelta = (rand() % delayDelta); - - int64_t maxDelay = - (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay; - - if (delayDelta > maxDelay) { - delayDelta = maxDelay; - } - - timer += delayDelta; // a random number + timer += getLaunchTimeDelay(pStream); + if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { timer = timer / 1000L; } @@ -428,24 +448,12 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { int64_t timer = pStream->stime - taosGetTimestamp(pStream->precision); if (timer < 0) timer = 0; - int64_t delayDelta = (int64_t)(pStream->interval * 0.1); - - int64_t maxDelay = - (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay; - if (delayDelta > maxDelay) { - delayDelta = maxDelay; - } - int64_t startDelay = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay; - - srand(time(NULL)); - timer += (rand() % delayDelta); // a random number - - if (timer < startDelay || timer > maxDelay) { - timer = (timer % startDelay) + startDelay; - } - + + timer += getLaunchTimeDelay(pStream); + timer += startDelay; + return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } diff --git a/src/inc/tglobalcfg.h b/src/inc/tglobalcfg.h index c4407f7d55..423079c448 100644 --- a/src/inc/tglobalcfg.h +++ b/src/inc/tglobalcfg.h @@ -135,6 +135,7 @@ extern int tsMinIntervalTime; extern int tsMaxStreamComputDelay; extern int tsStreamCompStartDelay; extern int tsStreamCompRetryDelay; +extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern int tsProjectExecInterval; extern int64_t tsMaxRetentWindow; diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 7e18e2e74b..daf2087747 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -138,18 +138,29 @@ int tsMaxSQLStringLen = TSDB_MAX_SQL_LEN; */ int tsCompressMsgSize = -1; -char tsSocketType[4] = "udp"; // use UDP by default[option: udp, tcp] -int tsTimePrecision = TSDB_TIME_PRECISION_MILLI; // time precision, millisecond by default -int tsMinSlidingTime = 10; // 10 ms for sliding time, the value will changed in - // case of time precision changed -int tsMinIntervalTime = 10; // 10 ms for interval time range, changed accordingly -int tsMaxStreamComputDelay = 20000; // 20sec, the maximum value of stream - // computing delay, changed accordingly -int tsStreamCompStartDelay = 10000; // 10sec, the first stream computing delay - // time after system launched successfully, - // changed accordingly -int tsStreamCompRetryDelay = 10; // the stream computing delay time after - // executing failed, change accordingly +// use UDP by default[option: udp, tcp] +char tsSocketType[4] = "udp"; + +// time precision, millisecond by default +int tsTimePrecision = TSDB_TIME_PRECISION_MILLI; + +// 10 ms for sliding time, the value will changed in case of time precision changed +int tsMinSlidingTime = 10; + +// 10 ms for interval time range, changed accordingly +int tsMinIntervalTime = 10; + +// 20sec, the maximum value of stream computing delay, changed accordingly +int tsMaxStreamComputDelay = 20000; + +// 10sec, the first stream computing delay time after system launched successfully, changed accordingly +int tsStreamCompStartDelay = 10000; + +// the stream computing delay time after executing failed, change accordingly +int tsStreamCompRetryDelay = 10; + +// The delayed computing ration. 10% of the whole computing time window by default. +float tsStreamComputDelayRatio = 0.1; int tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance @@ -617,9 +628,12 @@ static void doInitGlobalConfig() { TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 1000, 1000000000, 0, TSDB_CFG_UTYPE_MS); tsInitConfigOption(cfg++, "retryStreamCompDelay", &tsStreamCompRetryDelay, TSDB_CFG_VTYPE_INT, - TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, - 10, 1000000000, 0, TSDB_CFG_UTYPE_MS); - + TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 10, 1000000000, 0, TSDB_CFG_UTYPE_MS); + + + tsInitConfigOption(cfg++, "streamCompDelayRatio", &tsStreamComputDelayRatio, TSDB_CFG_VTYPE_FLOAT, + TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 0.1, 0.9, 0, TSDB_CFG_UTYPE_NONE); + tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 0, 1, 0, TSDB_CFG_UTYPE_NONE); -- GitLab