提交 4b4199ce 编写于 作者: T tickduan

cq support continue query from last stop time

上级 1ee65f6c
......@@ -422,6 +422,7 @@ typedef struct SSqlStream {
int64_t ctime; // stream created time
int64_t stime; // stream next executed time
int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed
int64_t ltime; // stream last row time in stream table
SInterval interval;
void * pTimer;
......
......@@ -538,12 +538,11 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
///*
//
// get tableName last row time, if have error return zero.
//
static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) {
int64_t last_time = 0;
char sql[128] = "";
sprintf(sql, "select last_row(*) from %s;", tableName);
......@@ -555,7 +554,7 @@ static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, con
// only fetch one row
TAOS_ROW row = taos_fetch_row(res);
if( row[0] ) {
if( row && row[0] ) {
last_time = *((int64_t*)row[0]);
}
......@@ -563,7 +562,7 @@ static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, con
taos_free_result(res);
return last_time;
}
//*/
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = pStream->pSql;
......@@ -597,10 +596,14 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
// set output table last record time to stime if have, why do this, because continue with last break
int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, pStream->dstTable);
// set output table last record time to stime if have, why do this, because continue with last brea
const char* dstTable = pStream->dstTable? pStream->dstTable: "";
int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, dstTable);
pStream->ltime = last_time;
tscDebug(" CQ get table=%s lasttime=%"PRId64" end.", dstTable, last_time);
if(last_time > 0 && last_time > pStream->stime) {
// can replace stime with last row time
tscDebug(" CQ set table %s stime=%"PRId64" with lasttime=%"PRId64" ", dstTable, pStream->stime, last_time);
pStream->stime = last_time;
}
......@@ -619,6 +622,24 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
pStream->dstTable = dstTable;
}
// already run on another thread
void tscCreateStreamThread(SSchedMsg* pMsg) {
tscDebug(" new thread Sched call tscCreateStream begin...");
tscCreateStream(pMsg->ahandle, NULL, 0);
tscDebug(" new thread Sched call tscCreateStream end.");
return ;
}
// parsesql async response return and change run thread
void tsParseSqlRet(void* param, TAOS_RES* res, int code) {
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscCreateStreamThread;
schedMsg.ahandle = param;
schedMsg.thandle = res;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos;
......@@ -664,15 +685,17 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
pSql->fp = tsParseSqlRet;
pSql->fetchFp = tsParseSqlRet;
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
tscCreateStream(pStream, pSql, code);
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscDebug(" cq parseSql IN Process pass. ");
} else {
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self);
free(pStream);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册