提交 7c927553 编写于 作者: S slguan

Processing Stream Computing Problems in Microsecond Accuracy Database

上级 fa719d5c
......@@ -334,6 +334,7 @@ typedef struct _sstream {
// stream will be closed
int64_t interval;
int64_t slidingTime;
int16_t precision;
void * pTimer;
void (*fp)();
......
......@@ -19,8 +19,8 @@
#include "ttimer.h"
#include "tutil.h"
#include "tsclient.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscProfile.h"
......@@ -40,10 +40,15 @@ static bool isProjectStream(SSqlCmd *pCmd) {
return true;
}
static int64_t tscGetRetryDelayTime(int64_t slidingTime) {
float RETRY_RANGE_FACTOR = 0.3;
static int64_t tscGetRetryDelayTime(int64_t slidingTime, int16_t prec) {
float retryRangeFactor = 0.3;
// change to ms
if (prec == TSDB_TIME_PRECISION_MICRO) {
slidingTime = slidingTime / 1000;
}
int64_t retryDelta = (int64_t)tsStreamCompRetryDelay * RETRY_RANGE_FACTOR;
int64_t retryDelta = (int64_t)tsStreamCompRetryDelay * retryRangeFactor;
retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L;
if (slidingTime < retryDelta) {
......@@ -76,8 +81,8 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
// failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime);
tscError("%p stream:%p,get metermeta failed, retry in %ldms.", pStream->pSql, pStream, retryDelayTime);
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p,get metermeta failed, retry in %lldms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
return;
......@@ -105,7 +110,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
*/
pSql->cmd.stime = pStream->stime; // start time
pSql->cmd.etime = taosGetTimestampMs(); // end time
pSql->cmd.etime = taosGetTimestamp(pStream->precision); // end time
if (pSql->cmd.etime > pStream->etime) {
pSql->cmd.etime = pStream->etime;
}
......@@ -126,8 +131,9 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) {
SSqlStream *pStream = (SSqlStream *)param;
if (tres == NULL || numOfRows < 0) {
int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime);
tscError("%p stream:%p, query data failed, code:%d, retry in %ldms", pStream->pSql, pStream, numOfRows, retryDelay);
int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, query data failed, code:%d, retry in %lldms", pStream->pSql, pStream, numOfRows,
retryDelay);
tscClearSqlMetaInfoForce(&(pStream->pSql->cmd));
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
......@@ -154,8 +160,8 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
SSqlObj * pSql = (SSqlObj *)res;
if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime);
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %ldms", pSql, pStream, numOfRows, retryDelayTime);
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %lldms", pSql, pStream, numOfRows, retryDelayTime);
tscClearSqlMetaInfoForce(&(pStream->pSql->cmd));
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
......@@ -247,7 +253,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) {
if (isProjectStream(&pSql->cmd)) {
int64_t now = taosGetTimestampMs();
int64_t now = taosGetTimestamp(pStream->precision);
int64_t etime = now > pStream->etime ? pStream->etime : now;
if (pStream->etime < now && now - pStream->etime > tsMaxRetentWindow) {
......@@ -265,11 +271,11 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
return;
}
tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld",
pStream->pSql, pStream, now + timer, timer, pStream->stime, etime);
tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", pStream->pSql, pStream,
now + timer, timer, pStream->stime, etime);
} else {
tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld",
pStream->pSql, pStream, pStream->stime, timer, pStream->stime - pStream->interval, pStream->stime - 1);
tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", pStream->pSql, pStream,
pStream->stime, timer, pStream->stime - pStream->interval, pStream->stime - 1);
}
pSql->cmd.command = TSDB_SQL_SELECT;
......@@ -288,8 +294,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
*/
timer = pStream->slidingTime;
if (pStream->stime > pStream->etime) {
tscTrace("%p stream:%p, stime:%lld is larger than end time: %lld, stop the stream",
pStream->pSql, pStream, pStream->stime, pStream->etime);
tscTrace("%p stream:%p, stime:%lld is larger than end time: %lld, stop the stream", pStream->pSql, pStream,
pStream->stime, pStream->etime);
// TODO : How to terminate stream here
taos_close_stream(pStream);
if (pStream->callback) {
......@@ -301,8 +307,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
} else {
pStream->stime += pStream->slidingTime;
if ((pStream->stime - pStream->interval) >= pStream->etime) {
tscTrace("%p stream:%p, stime:%ld is larger than end time: %ld, stop the stream",
pStream->pSql, pStream, pStream->stime, pStream->etime);
tscTrace("%p stream:%p, stime:%ld is larger than end time: %ld, stop the stream", pStream->pSql, pStream,
pStream->stime, pStream->etime);
// TODO : How to terminate stream here
taos_close_stream(pStream);
if (pStream->callback) {
......@@ -312,7 +318,7 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
return;
}
timer = pStream->stime - taosGetTimestampSec() * 1000L;
timer = pStream->stime - taosGetTimestamp(pStream->precision);
if (timer < 0) {
timer = 0;
}
......@@ -320,108 +326,62 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
int64_t delayDelta = (int64_t)(pStream->slidingTime * 0.1);
delayDelta = (rand() % delayDelta);
if (delayDelta > tsMaxStreamComputDelay) {
delayDelta = tsMaxStreamComputDelay;
int64_t maxDelay =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
if (delayDelta > maxDelay) {
delayDelta = maxDelay;
}
timer += delayDelta; // a random number
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
timer = timer / 1000L;
}
tscSetRetryTimer(pStream, pSql, timer);
}
TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL;
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
if (tscEmbedded) {
tscError("%p server out of memory", pSql);
pSql->res.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
} else {
tscError("%p client out of memory", pSql);
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
}
return NULL;
}
pSql->signature = pSql;
pSql->pTscObj = pObj;
static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
int32_t len = strlen(sqlstr);
pSql->sqlstr = malloc(strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
if (tscEmbedded) {
tscError("%p server out of memory", pSql);
pSql->res.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
} else {
tscError("%p client out of memory", pSql);
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
}
return NULL;
}
strcpy(pSql->sqlstr, sqlstr);
pSql->sqlstr[len] = 0;
sem_init(&pSql->rspSem, 0, 0);
sem_init(&pSql->emptyRspSem, 0, 1);
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
if (pCmd->nAggTimeInterval < minIntervalTime) {
tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%lld", pSql, pStream,
pCmd->nAggTimeInterval, minIntervalTime);
SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr);
pRes->code = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo);
if (pRes->code != 0) {
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
pCmd->nAggTimeInterval = minIntervalTime;
}
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) return NULL;
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pStream->ctime = taosGetTimestampMs();
pStream->etime = (pCmd->etime) ? pCmd->etime : INT64_MAX;
pSql->pStream = pStream;
tscAddIntoStreamList(pStream);
if (pCmd->nAggTimeInterval < tsMinIntervalTime) {
tscWarn("%p stream:%p, original sample interval:%ldms too small, reset to:%ldms", pSql, pStream,
pCmd->nAggTimeInterval, tsMinIntervalTime);
pCmd->nAggTimeInterval = tsMinIntervalTime;
}
pStream->interval = pCmd->nAggTimeInterval; // it shall be derived from sql string
if (pCmd->nSlidingTime == 0) {
pCmd->nSlidingTime = pCmd->nAggTimeInterval;
}
if (pCmd->nSlidingTime < tsMinSlidingTime) {
tscWarn("%p stream:%p, original sliding value:%lldms too small, reset to:%lldms",
pSql, pStream, pCmd->nSlidingTime, tsMinSlidingTime);
int64_t minSlidingTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
if (pCmd->nSlidingTime < minSlidingTime) {
tscWarn("%p stream:%p, original sliding value:%lld too small, reset to:%lld", pSql, pStream, pCmd->nSlidingTime,
minSlidingTime);
pCmd->nSlidingTime = tsMinSlidingTime;
pCmd->nSlidingTime = minSlidingTime;
}
if (pCmd->nSlidingTime > pCmd->nAggTimeInterval) {
tscWarn("%p stream:%p, sliding value:%lldms can not be larger than interval range, reset to:%lldms",
pSql, pStream, pCmd->nSlidingTime, pCmd->nAggTimeInterval);
tscWarn("%p stream:%p, sliding value:%lld can not be larger than interval range, reset to:%lld", pSql, pStream,
pCmd->nSlidingTime, pCmd->nAggTimeInterval);
pCmd->nSlidingTime = pCmd->nAggTimeInterval;
}
pStream->slidingTime = pCmd->nSlidingTime;
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
SSqlCmd *pCmd = &pSql->cmd;
if (isProjectStream(pCmd)) {
// no data in table, flush all data till now to destination meter, 10sec delay
......@@ -429,15 +389,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param,
pStream->slidingTime = tsProjectExecInterval;
if (stime != 0) { // first projection start from the latest event timestamp
assert(stime >= pCmd->etime);
assert(stime >= pCmd->stime);
stime += 1; // exclude the last records from table
} else {
stime = pCmd->etime;
stime = pCmd->stime;
}
} else {
// timewindow based aggregation stream
} else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now
stime = ((int64_t)taosGetTimestampSec() * 1000L / pStream->interval) * pStream->interval;
stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval;
tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime, stime);
} else {
int64_t newStime = (stime / pStream->interval) * pStream->interval;
......@@ -448,26 +407,97 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param,
}
}
pStream->stime = stime;
return stime;
}
int64_t timer = pStream->stime - taosGetTimestampSec() * 1000L;
static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
int64_t timer = pStream->stime - taosGetTimestamp(pStream->precision);
if (timer < 0) timer = 0;
int64_t delayDelta = (int64_t)(pStream->interval * 0.1);
if (delayDelta > tsMaxStreamComputDelay) {
delayDelta = tsMaxStreamComputDelay;
int64_t maxDelay =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
if (delayDelta > maxDelay) {
delayDelta = maxDelay;
}
int64_t startDelay =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay;
srand(time(NULL));
timer += (rand() % delayDelta); // a random number
if (timer < tsStreamCompStartDelay || timer > tsMaxStreamComputDelay) {
timer = (timer % tsStreamCompStartDelay) + tsStreamCompStartDelay;
if (timer < startDelay || timer > maxDelay) {
timer = (timer % startDelay) + startDelay;
}
taosTmrReset(tscProcessStreamTimer, timer, pStream, tscTmr, &pStream->pTimer);
tscTrace("%p stream:%p is opened, query on:%s, interval:%ld, sliding:%ld, first launched in:%ld ms, sql:%s",
pSql, pStream, pSql->cmd.name, pStream->interval, pStream->slidingTime, timer, sqlstr);
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL;
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { // todo set corect error msg
return NULL;
}
pSql->signature = pSql;
pSql->pTscObj = pObj;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) { // todo set corect error msg
tfree(pSql);
return NULL;
}
strcpy(pSql->sqlstr, sqlstr);
sem_init(&pSql->rspSem, 0, 0);
sem_init(&pSql->emptyRspSem, 0, 1);
SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr);
pRes->code = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
}
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) {
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
}
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pStream->ctime = taosGetTimestamp(pCmd->pMeterMeta->precision);
pStream->etime = pCmd->etime;
pStream->precision = pCmd->pMeterMeta->precision;
pSql->pStream = pStream;
tscAddIntoStreamList(pStream);
tscSetSlidingWindowInfo(pSql, pStream);
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime);
int64_t starttime = tscGetLaunchTimestamp(pStream);
taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer);
tscTrace("%p stream:%p is opened, query on:%s, interval:%lld, sliding:%lld, first launched in:%lld, sql:%s", pSql,
pStream, pSql->cmd.name, pStream->interval, pStream->slidingTime, starttime, sqlstr);
return pStream;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册