diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 99dab92362713a5f875d3bf8aab4794162f75f0f..0cf0a6d547f7104037051735e510d92476eef37b 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -334,6 +334,7 @@ typedef struct _sstream { // stream will be closed int64_t interval; int64_t slidingTime; + int16_t precision; void * pTimer; void (*fp)(); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 1d45f9ec9802a0c7b895afd0ada9cfea8aac2eae..84741f098e269e8d6037823a6b8b1823f9f16613 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -19,8 +19,8 @@ #include "ttimer.h" #include "tutil.h" -#include "tsclient.h" #include "tscUtil.h" +#include "tsclient.h" #include "tscProfile.h" @@ -40,10 +40,15 @@ static bool isProjectStream(SSqlCmd *pCmd) { return true; } -static int64_t tscGetRetryDelayTime(int64_t slidingTime) { - float RETRY_RANGE_FACTOR = 0.3; +static int64_t tscGetRetryDelayTime(int64_t slidingTime, int16_t prec) { + float retryRangeFactor = 0.3; + + // change to ms + if (prec == TSDB_TIME_PRECISION_MICRO) { + slidingTime = slidingTime / 1000; + } - int64_t retryDelta = (int64_t)tsStreamCompRetryDelay * RETRY_RANGE_FACTOR; + int64_t retryDelta = (int64_t)tsStreamCompRetryDelay * retryRangeFactor; retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L; if (slidingTime < retryDelta) { @@ -76,8 +81,8 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { // failed to get meter/metric meta, retry in 10sec. if (code != TSDB_CODE_SUCCESS) { - int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime); - tscError("%p stream:%p,get metermeta failed, retry in %ldms.", pStream->pSql, pStream, retryDelayTime); + int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); + tscError("%p stream:%p,get metermeta failed, retry in %lldms", pStream->pSql, pStream, retryDelayTime); tscSetRetryTimer(pStream, pSql, retryDelayTime); return; @@ -105,7 +110,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { */ pSql->cmd.stime = pStream->stime; // start time - pSql->cmd.etime = taosGetTimestampMs(); // end time + pSql->cmd.etime = taosGetTimestamp(pStream->precision); // end time if (pSql->cmd.etime > pStream->etime) { pSql->cmd.etime = pStream->etime; } @@ -126,8 +131,9 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) { SSqlStream *pStream = (SSqlStream *)param; if (tres == NULL || numOfRows < 0) { - int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime); - tscError("%p stream:%p, query data failed, code:%d, retry in %ldms", pStream->pSql, pStream, numOfRows, retryDelay); + int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); + tscError("%p stream:%p, query data failed, code:%d, retry in %lldms", pStream->pSql, pStream, numOfRows, + retryDelay); tscClearSqlMetaInfoForce(&(pStream->pSql->cmd)); tscSetRetryTimer(pStream, pStream->pSql, retryDelay); @@ -154,8 +160,8 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf SSqlObj * pSql = (SSqlObj *)res; if (pSql == NULL || numOfRows < 0) { - int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime); - tscError("%p stream:%p, retrieve data failed, code:%d, retry in %ldms", pSql, pStream, numOfRows, retryDelayTime); + int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); + tscError("%p stream:%p, retrieve data failed, code:%d, retry in %lldms", pSql, pStream, numOfRows, retryDelayTime); tscClearSqlMetaInfoForce(&(pStream->pSql->cmd)); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); @@ -247,7 +253,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) { if (isProjectStream(&pSql->cmd)) { - int64_t now = taosGetTimestampMs(); + int64_t now = taosGetTimestamp(pStream->precision); int64_t etime = now > pStream->etime ? pStream->etime : now; if (pStream->etime < now && now - pStream->etime > tsMaxRetentWindow) { @@ -265,11 +271,11 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) return; } - tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", - pStream->pSql, pStream, now + timer, timer, pStream->stime, etime); + tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", pStream->pSql, pStream, + now + timer, timer, pStream->stime, etime); } else { - tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", - pStream->pSql, pStream, pStream->stime, timer, pStream->stime - pStream->interval, pStream->stime - 1); + tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", pStream->pSql, pStream, + pStream->stime, timer, pStream->stime - pStream->interval, pStream->stime - 1); } pSql->cmd.command = TSDB_SQL_SELECT; @@ -288,8 +294,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { */ timer = pStream->slidingTime; if (pStream->stime > pStream->etime) { - tscTrace("%p stream:%p, stime:%lld is larger than end time: %lld, stop the stream", - pStream->pSql, pStream, pStream->stime, pStream->etime); + tscTrace("%p stream:%p, stime:%lld is larger than end time: %lld, stop the stream", pStream->pSql, pStream, + pStream->stime, pStream->etime); // TODO : How to terminate stream here taos_close_stream(pStream); if (pStream->callback) { @@ -301,8 +307,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { } else { pStream->stime += pStream->slidingTime; if ((pStream->stime - pStream->interval) >= pStream->etime) { - tscTrace("%p stream:%p, stime:%ld is larger than end time: %ld, stop the stream", - pStream->pSql, pStream, pStream->stime, pStream->etime); + tscTrace("%p stream:%p, stime:%ld is larger than end time: %ld, stop the stream", pStream->pSql, pStream, + pStream->stime, pStream->etime); // TODO : How to terminate stream here taos_close_stream(pStream); if (pStream->callback) { @@ -312,7 +318,7 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { return; } - timer = pStream->stime - taosGetTimestampSec() * 1000L; + timer = pStream->stime - taosGetTimestamp(pStream->precision); if (timer < 0) { timer = 0; } @@ -320,108 +326,62 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { int64_t delayDelta = (int64_t)(pStream->slidingTime * 0.1); delayDelta = (rand() % delayDelta); - if (delayDelta > tsMaxStreamComputDelay) { - delayDelta = tsMaxStreamComputDelay; + + int64_t maxDelay = + (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay; + + if (delayDelta > maxDelay) { + delayDelta = maxDelay; } timer += delayDelta; // a random number + if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { + timer = timer / 1000L; + } + tscSetRetryTimer(pStream, pSql, timer); } -TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), - int64_t stime, void *param, void (*callback)(void *)) { - STscObj *pObj = (STscObj *)taos; - if (pObj == NULL || pObj->signature != pObj) return NULL; - - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - if (tscEmbedded) { - tscError("%p server out of memory", pSql); - pSql->res.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - } else { - tscError("%p client out of memory", pSql); - pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; - } - - return NULL; - } - - pSql->signature = pSql; - pSql->pTscObj = pObj; +static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); - - int32_t len = strlen(sqlstr); - pSql->sqlstr = malloc(strlen(sqlstr) + 1); - if (pSql->sqlstr == NULL) { - if (tscEmbedded) { - tscError("%p server out of memory", pSql); - pSql->res.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - } else { - tscError("%p client out of memory", pSql); - pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; - } - - return NULL; - } - strcpy(pSql->sqlstr, sqlstr); - pSql->sqlstr[len] = 0; - sem_init(&pSql->rspSem, 0, 0); - sem_init(&pSql->emptyRspSem, 0, 1); + int64_t minIntervalTime = + (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime; + if (pCmd->nAggTimeInterval < minIntervalTime) { + tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%lld", pSql, pStream, + pCmd->nAggTimeInterval, minIntervalTime); - SSqlInfo SQLInfo = {0}; - tSQLParse(&SQLInfo, pSql->sqlstr); - pRes->code = tscToSQLCmd(pSql, &SQLInfo); - SQLInfoDestroy(&SQLInfo); - - if (pRes->code != 0) { - tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); - tscFreeSqlObj(pSql); - return NULL; + pCmd->nAggTimeInterval = minIntervalTime; } - SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream)); - if (pStream == NULL) return NULL; - - pStream->fp = fp; - pStream->callback = callback; - pStream->param = param; - pStream->pSql = pSql; - pStream->ctime = taosGetTimestampMs(); - pStream->etime = (pCmd->etime) ? pCmd->etime : INT64_MAX; - - pSql->pStream = pStream; - tscAddIntoStreamList(pStream); - - if (pCmd->nAggTimeInterval < tsMinIntervalTime) { - tscWarn("%p stream:%p, original sample interval:%ldms too small, reset to:%ldms", pSql, pStream, - pCmd->nAggTimeInterval, tsMinIntervalTime); - - pCmd->nAggTimeInterval = tsMinIntervalTime; - } pStream->interval = pCmd->nAggTimeInterval; // it shall be derived from sql string if (pCmd->nSlidingTime == 0) { pCmd->nSlidingTime = pCmd->nAggTimeInterval; } - if (pCmd->nSlidingTime < tsMinSlidingTime) { - tscWarn("%p stream:%p, original sliding value:%lldms too small, reset to:%lldms", - pSql, pStream, pCmd->nSlidingTime, tsMinSlidingTime); + int64_t minSlidingTime = + (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime; + + if (pCmd->nSlidingTime < minSlidingTime) { + tscWarn("%p stream:%p, original sliding value:%lld too small, reset to:%lld", pSql, pStream, pCmd->nSlidingTime, + minSlidingTime); - pCmd->nSlidingTime = tsMinSlidingTime; + pCmd->nSlidingTime = minSlidingTime; } if (pCmd->nSlidingTime > pCmd->nAggTimeInterval) { - tscWarn("%p stream:%p, sliding value:%lldms can not be larger than interval range, reset to:%lldms", - pSql, pStream, pCmd->nSlidingTime, pCmd->nAggTimeInterval); + tscWarn("%p stream:%p, sliding value:%lld can not be larger than interval range, reset to:%lld", pSql, pStream, + pCmd->nSlidingTime, pCmd->nAggTimeInterval); pCmd->nSlidingTime = pCmd->nAggTimeInterval; } pStream->slidingTime = pCmd->nSlidingTime; +} + +static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { + SSqlCmd *pCmd = &pSql->cmd; if (isProjectStream(pCmd)) { // no data in table, flush all data till now to destination meter, 10sec delay @@ -429,15 +389,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, pStream->slidingTime = tsProjectExecInterval; if (stime != 0) { // first projection start from the latest event timestamp - assert(stime >= pCmd->etime); + assert(stime >= pCmd->stime); stime += 1; // exclude the last records from table } else { - stime = pCmd->etime; + stime = pCmd->stime; } - } else { - // timewindow based aggregation stream + } else { // timewindow based aggregation stream if (stime == 0) { // no data in meter till now - stime = ((int64_t)taosGetTimestampSec() * 1000L / pStream->interval) * pStream->interval; + stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime, stime); } else { int64_t newStime = (stime / pStream->interval) * pStream->interval; @@ -448,26 +407,97 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, } } - pStream->stime = stime; + return stime; +} - int64_t timer = pStream->stime - taosGetTimestampSec() * 1000L; +static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { + int64_t timer = pStream->stime - taosGetTimestamp(pStream->precision); if (timer < 0) timer = 0; int64_t delayDelta = (int64_t)(pStream->interval * 0.1); - if (delayDelta > tsMaxStreamComputDelay) { - delayDelta = tsMaxStreamComputDelay; + + int64_t maxDelay = + (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay; + if (delayDelta > maxDelay) { + delayDelta = maxDelay; } + int64_t startDelay = + (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay; + srand(time(NULL)); timer += (rand() % delayDelta); // a random number - if (timer < tsStreamCompStartDelay || timer > tsMaxStreamComputDelay) { - timer = (timer % tsStreamCompStartDelay) + tsStreamCompStartDelay; + if (timer < startDelay || timer > maxDelay) { + timer = (timer % startDelay) + startDelay; } - taosTmrReset(tscProcessStreamTimer, timer, pStream, tscTmr, &pStream->pTimer); - tscTrace("%p stream:%p is opened, query on:%s, interval:%ld, sliding:%ld, first launched in:%ld ms, sql:%s", - pSql, pStream, pSql->cmd.name, pStream->interval, pStream->slidingTime, timer, sqlstr); + return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; +} + +TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), + int64_t stime, void *param, void (*callback)(void *)) { + STscObj *pObj = (STscObj *)taos; + if (pObj == NULL || pObj->signature != pObj) return NULL; + + SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { // todo set corect error msg + return NULL; + } + + pSql->signature = pSql; + pSql->pTscObj = pObj; + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); + + pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); + if (pSql->sqlstr == NULL) { // todo set corect error msg + tfree(pSql); + return NULL; + } + strcpy(pSql->sqlstr, sqlstr); + + sem_init(&pSql->rspSem, 0, 0); + sem_init(&pSql->emptyRspSem, 0, 1); + + SSqlInfo SQLInfo = {0}; + tSQLParse(&SQLInfo, pSql->sqlstr); + pRes->code = tscToSQLCmd(pSql, &SQLInfo); + SQLInfoDestroy(&SQLInfo); + + if (pRes->code != TSDB_CODE_SUCCESS) { + tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); + tscFreeSqlObj(pSql); + return NULL; + } + + SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream)); + if (pStream == NULL) { + tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); + tscFreeSqlObj(pSql); + return NULL; + } + + pStream->fp = fp; + pStream->callback = callback; + pStream->param = param; + pStream->pSql = pSql; + pStream->ctime = taosGetTimestamp(pCmd->pMeterMeta->precision); + pStream->etime = pCmd->etime; + pStream->precision = pCmd->pMeterMeta->precision; + + pSql->pStream = pStream; + tscAddIntoStreamList(pStream); + + tscSetSlidingWindowInfo(pSql, pStream); + pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime); + + int64_t starttime = tscGetLaunchTimestamp(pStream); + taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); + + tscTrace("%p stream:%p is opened, query on:%s, interval:%lld, sliding:%lld, first launched in:%lld, sql:%s", pSql, + pStream, pSql->cmd.name, pStream->interval, pStream->slidingTime, starttime, sqlstr); return pStream; }