diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 67fd34ffd7425cf921e927247149808540de3020..c002f2cf323a03ec024f4ebc41e04ac7e6c07571 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -283,6 +283,7 @@ typedef struct SSqlStream { int64_t ctime; // stream created time int64_t stime; // stream next executed time int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed + int64_t ltime; // stream last row time in stream table SInterval interval; void * pTimer; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 73a3fbafc3c5bb897d26375a129526e083eaf0e8..2fc18943da73ed394b61d9e42157f2e6523d2191 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -24,6 +24,7 @@ #include "tutil.h" #include "tscProfile.h" +#include "tscSubquery.h" static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows); @@ -47,8 +48,8 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) { static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) { float retryRangeFactor = 0.3f; - int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor); - retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L; + int64_t retryDelta = (int64_t)(tsRetryStreamCompDelay * retryRangeFactor); + retryDelta = ((rand() % retryDelta) + tsRetryStreamCompDelay) * 1000L; if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { // change to ms @@ -575,6 +576,14 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); + // set stime with ltime if ltime > stime + const char* dstTable = pStream->dstTable? pStream->dstTable: ""; + tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime); + if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) { + tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime); + pStream->stime = pStream->ltime; + } + int64_t starttime = tscGetLaunchTimestamp(pStream); pCmd->command = TSDB_SQL_SELECT; @@ -590,7 +599,66 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) { pStream->dstTable = dstTable; } -TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), +// fetchFp call back +void fetchFpStreamLastRow(void* param ,TAOS_RES* res, int num) { + SSqlStream* pStream = (SSqlStream*)param; + SSqlObj* pSql = res; + + // get row data set to ltime + tscSetSqlOwner(pSql); + TAOS_ROW row = doSetResultRowData(pSql); + if( row && row[0] ) { + pStream->ltime = *((int64_t*)row[0]); + const char* dstTable = pStream->dstTable? pStream->dstTable: ""; + tscDebug(" CQ stream table=%s last row time=%"PRId64" .", dstTable, pStream->ltime); + } + tscClearSqlOwner(pSql); + + // no condition call + tscCreateStream(param, pStream->pSql, TSDB_CODE_SUCCESS); + taos_free_result(res); +} + +// fp callback +void fpStreamLastRow(void* param ,TAOS_RES* res, int code) { + // check result successful + if (code != TSDB_CODE_SUCCESS) { + tscCreateStream(param, res, TSDB_CODE_SUCCESS); + taos_free_result(res); + return ; + } + + // asynchronous fetch last row data + taos_fetch_rows_a(res, fetchFpStreamLastRow, param); +} + +void cbParseSql(void* param, TAOS_RES* res, int code) { + // check result successful + SSqlStream* pStream = (SSqlStream*)param; + SSqlObj* pSql = pStream->pSql; + SSqlCmd* pCmd = &pSql->cmd; + if (code != TSDB_CODE_SUCCESS) { + pSql->res.code = code; + tscDebug("0x%"PRIx64" open stream parse sql failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code)); + pStream->fp(pStream->param, NULL, NULL); + return; + } + + // check dstTable valid + if(pStream->dstTable == NULL || strlen(pStream->dstTable) == 0) { + tscDebug(" cbParseSql dstTable is empty."); + tscCreateStream(param, res, code); + return ; + } + + // query stream last row time async + char sql[128] = ""; + sprintf(sql, "select last_row(*) from %s;", pStream->dstTable); + taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param); + return ; +} + +TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) return NULL; @@ -613,11 +681,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return NULL; } - pStream->stime = stime; - pStream->fp = fp; + pStream->ltime = INT64_MIN; + pStream->stime = stime; + pStream->fp = fp; pStream->callback = callback; - pStream->param = param; - pStream->pSql = pSql; + pStream->param = param; + pStream->pSql = pSql; + pSql->pStream = pStream; + pSql->param = pStream; + pSql->maxRetry = TSDB_MAX_REPLICA; + tscSetStreamDestTable(pStream, dstTable); pSql->pStream = pStream; pSql->param = pStream; @@ -640,10 +713,17 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); + pSql->fp = cbParseSql; + pSql->fetchFp = cbParseSql; + + registerSqlObj(pSql); + int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_SUCCESS) { - tscCreateStream(pStream, pSql, code); - } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + cbParseSql(pStream, pSql, code); + } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr); + } else { tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code)); taosReleaseRef(tscObjRef, pSql->self); free(pStream); @@ -653,6 +733,11 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return pStream; } +TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), + int64_t stime, void *param, void (*callback)(void *)) { + return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback); +} + void taos_close_stream(TAOS_STREAM *handle) { SSqlStream *pStream = (SSqlStream *)handle; diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index e07c3611d7b62b82689785446173d8a0d2fe9d11..88240f85edf8478e63168d0c2ad25fc5f98ee3ca 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -75,7 +75,7 @@ extern int32_t tsMinSlidingTime; extern int32_t tsMinIntervalTime; extern int32_t tsMaxStreamComputDelay; extern int32_t tsStreamCompStartDelay; -extern int32_t tsStreamCompRetryDelay; +extern int32_t tsRetryStreamCompDelay; extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern int32_t tsProjectExecInterval; extern int64_t tsMaxRetentWindow; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index a1266bb2cb3d793088513a388568c74c94e9f54e..f8127b24b9c5d41a62242faf843d117b1a054a20 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -93,7 +93,7 @@ int32_t tsMaxStreamComputDelay = 20000; int32_t tsStreamCompStartDelay = 10000; // the stream computing delay time after executing failed, change accordingly -int32_t tsStreamCompRetryDelay = 10; +int32_t tsRetryStreamCompDelay = 10*1000; // The delayed computing ration. 10% of the whole computing time window by default. float tsStreamComputDelayRatio = 0.1f; @@ -710,7 +710,7 @@ static void doInitGlobalConfig(void) { taosInitConfigOption(cfg); cfg.option = "retryStreamCompDelay"; - cfg.ptr = &tsStreamCompRetryDelay; + cfg.ptr = &tsRetryStreamCompDelay; cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 10; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 5d5d5f339eec62db09f4c93acad0cea096c0962d..ee4be02b9004b56c0139b792393d00d9ea6a6f98 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -437,6 +437,10 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { taosReleaseRef(cqObjRef, (int64_t)param); } +// inner implement in tscStream.c +TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), + int64_t stime, void *param, void (*callback)(void *)); + static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->pContext = pContext; @@ -449,11 +453,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->tmrId = 0; if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); + pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); // TODO the pObj->pStream may be released if error happens if (pObj->pStream) { - tscSetStreamDestTable(pObj->pStream, pObj->dstTable); pContext->num++; cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr); } else { diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 6eaf4e18af53eac2b6e3c93749528993d8477e3d..0f71ffd0a37587cc6be895b4b8b168e6b8cfcaf8 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -432,7 +432,7 @@ class TDDnodes: self.simDeployed = False def init(self, path): - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" + psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): killCmd = "kill -TERM %s > /dev/null 2>&1" % processID @@ -545,14 +545,14 @@ class TDDnodes: for i in range(len(self.dnodes)): self.dnodes[i].stop() - psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" + psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") if processID: cmd = "sudo systemctl stop taosd" os.system(cmd) # if os.system(cmd) != 0 : # tdLog.exit(cmd) - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" + psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): killCmd = "kill -TERM %s > /dev/null 2>&1" % processID