提交 33367d70 编写于 作者: H hjxilinx

[tbase-1428]

上级 e7f98c20
......@@ -211,6 +211,12 @@
# whether to enable HTTP compression transmission
# httpEnableCompress 0
# the delayed time for launching each continuous query. 10% of the whole computing time window by default.
# streamCompDelayRatio 0.1
# the max allowed delayed time for launching continuous query. 20ms by default
# tsMaxStreamComputDelay 20000
# whether the telegraf table name contains the number of tags and the number of fields
# telegrafUseFieldNum 0
......
......@@ -31,6 +31,10 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql);
static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer);
static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) {
return taosGetTimestamp(pStream->precision) + launchDelay - pStream->stime - 1;
}
static bool isProjectStream(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
......@@ -88,7 +92,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if (code != TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
return;
}
......@@ -144,7 +148,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pStream->pSql->cmd, 0, 0);
tscClearMeterMetaInfo(pMeterMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return;
}
......@@ -174,7 +178,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
tscClearMeterMetaInfo(pMeterMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
return;
}
......@@ -243,6 +247,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retry);
tscClearSqlMetaInfoForce(&(pStream->pSql->cmd));
tscSetRetryTimer(pStream, pStream->pSql, retry);
return;
}
......@@ -263,6 +268,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
int64_t delay = getDelayValueAfterTimewindowClosed(pStream, timer);
if (isProjectStream(pQueryInfo)) {
int64_t now = taosGetTimestamp(pStream->precision);
......@@ -282,12 +288,12 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
}
return;
}
tscTrace("%p stream:%p, next query start at %" PRId64 ", in %" PRId64 "ms. query range %" PRId64 "-%" PRId64 "", pStream->pSql, pStream,
now + timer, timer, pStream->stime, etime);
tscTrace("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64 "", pStream->pSql, pStream,
now + timer, timer, delay, pStream->stime, etime);
} else {
tscTrace("%p stream:%p, next query start at %" PRId64 ", in %" PRId64 "ms. query range %" PRId64 "-%" PRId64 "", pStream->pSql, pStream,
pStream->stime, timer, pStream->stime - pStream->interval, pStream->stime - 1);
tscTrace("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64 "", pStream->pSql, pStream,
pStream->stime, timer, delay, pStream->stime - pStream->interval, pStream->stime - 1);
}
pSql->cmd.command = TSDB_SQL_SELECT;
......@@ -296,6 +302,29 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
taosTmrReset(tscProcessStreamTimer, timer, pStream, tscTmr, &pStream->pTimer);
}
static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
int64_t delayDelta = (int64_t)(pStream->slidingTime * tsStreamComputDelayRatio);
int64_t maxDelay =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
if (delayDelta > maxDelay) {
delayDelta = maxDelay;
}
int64_t remainTimeWindow = pStream->slidingTime - delayDelta;
if (maxDelay > remainTimeWindow) {
maxDelay = (remainTimeWindow / 1.5);
}
int64_t currentDelay = (rand() % maxDelay); // a random number
currentDelay += delayDelta;
assert(currentDelay < pStream->slidingTime);
return currentDelay;
}
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
int64_t timer = 0;
......@@ -330,24 +359,15 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
}
return;
}
timer = pStream->stime - taosGetTimestamp(pStream->precision);
if (timer < 0) {
timer = 0;
}
}
int64_t delayDelta = (int64_t)(pStream->slidingTime * 0.1);
delayDelta = (rand() % delayDelta);
int64_t maxDelay =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
if (delayDelta > maxDelay) {
delayDelta = maxDelay;
}
timer += delayDelta; // a random number
timer += getLaunchTimeDelay(pStream);
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
timer = timer / 1000L;
}
......@@ -428,24 +448,12 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
int64_t timer = pStream->stime - taosGetTimestamp(pStream->precision);
if (timer < 0) timer = 0;
int64_t delayDelta = (int64_t)(pStream->interval * 0.1);
int64_t maxDelay =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
if (delayDelta > maxDelay) {
delayDelta = maxDelay;
}
int64_t startDelay =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay;
srand(time(NULL));
timer += (rand() % delayDelta); // a random number
if (timer < startDelay || timer > maxDelay) {
timer = (timer % startDelay) + startDelay;
}
timer += getLaunchTimeDelay(pStream);
timer += startDelay;
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
......
......@@ -135,6 +135,7 @@ extern int tsMinIntervalTime;
extern int tsMaxStreamComputDelay;
extern int tsStreamCompStartDelay;
extern int tsStreamCompRetryDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int tsProjectExecInterval;
extern int64_t tsMaxRetentWindow;
......
......@@ -138,18 +138,29 @@ int tsMaxSQLStringLen = TSDB_MAX_SQL_LEN;
*/
int tsCompressMsgSize = -1;
char tsSocketType[4] = "udp"; // use UDP by default[option: udp, tcp]
int tsTimePrecision = TSDB_TIME_PRECISION_MILLI; // time precision, millisecond by default
int tsMinSlidingTime = 10; // 10 ms for sliding time, the value will changed in
// case of time precision changed
int tsMinIntervalTime = 10; // 10 ms for interval time range, changed accordingly
int tsMaxStreamComputDelay = 20000; // 20sec, the maximum value of stream
// computing delay, changed accordingly
int tsStreamCompStartDelay = 10000; // 10sec, the first stream computing delay
// time after system launched successfully,
// changed accordingly
int tsStreamCompRetryDelay = 10; // the stream computing delay time after
// executing failed, change accordingly
// use UDP by default[option: udp, tcp]
char tsSocketType[4] = "udp";
// time precision, millisecond by default
int tsTimePrecision = TSDB_TIME_PRECISION_MILLI;
// 10 ms for sliding time, the value will changed in case of time precision changed
int tsMinSlidingTime = 10;
// 10 ms for interval time range, changed accordingly
int tsMinIntervalTime = 10;
// 20sec, the maximum value of stream computing delay, changed accordingly
int tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly
int tsStreamCompStartDelay = 10000;
// the stream computing delay time after executing failed, change accordingly
int tsStreamCompRetryDelay = 10;
// The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1;
int tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
......@@ -617,9 +628,12 @@ static void doInitGlobalConfig() {
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
1000, 1000000000, 0, TSDB_CFG_UTYPE_MS);
tsInitConfigOption(cfg++, "retryStreamCompDelay", &tsStreamCompRetryDelay, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
10, 1000000000, 0, TSDB_CFG_UTYPE_MS);
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 10, 1000000000, 0, TSDB_CFG_UTYPE_MS);
tsInitConfigOption(cfg++, "streamCompDelayRatio", &tsStreamComputDelayRatio, TSDB_CFG_VTYPE_FLOAT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 0.1, 0.9, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
0, 1, 0, TSDB_CFG_UTYPE_NONE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册