提交 398397bc 编写于 作者: T tickduan

change to asynchrous call mode with support last time query

上级 f3b42525
......@@ -25,6 +25,7 @@
#include "tutil.h"
#include "tscProfile.h"
#include "tscSubquery.h"
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows);
......@@ -538,31 +539,7 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
///*
//
// get tableName last row time, if have error return zero.
//
static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) {
int64_t last_time = 0;
char sql[128] = "";
sprintf(sql, "select last_row(*) from %s;", tableName);
// query sql
TAOS_RES* res = taos_query(pSql->pTscObj, sql);
if(res == NULL)
return 0;
// only fetch one row
TAOS_ROW row = taos_fetch_row(res);
if( row && row[0] ) {
last_time = *((int64_t*)row[0]);
}
// free and return
taos_free_result(res);
return last_time;
}
//*/
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = pStream->pSql;
......@@ -596,15 +573,12 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
// set output table last record time to stime if have, why do this, because continue with last brea
// set stime with ltime if ltime > stime
const char* dstTable = pStream->dstTable? pStream->dstTable: "";
int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, dstTable);
pStream->ltime = last_time;
tscDebug(" CQ get table=%s lasttime=%"PRId64" end.", dstTable, last_time);
if(last_time > 0 && last_time > pStream->stime) {
// can replace stime with last row time
tscDebug(" CQ set table %s stime=%"PRId64" with lasttime=%"PRId64" ", dstTable, pStream->stime, last_time);
pStream->stime = last_time;
tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime);
if(pStream->ltime > 0 && pStream->ltime > pStream->stime) {
tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" ", dstTable, pStream->stime, pStream->ltime);
pStream->stime = pStream->ltime;
}
int64_t starttime = tscGetLaunchTimestamp(pStream);
......@@ -622,25 +596,66 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
pStream->dstTable = dstTable;
}
// already run on another thread
void tscCreateStreamThread(SSchedMsg* pMsg) {
tscDebug(" new thread Sched call tscCreateStream begin...");
tscCreateStream(pMsg->ahandle, NULL, 0);
tscDebug(" new thread Sched call tscCreateStream end.");
return ;
// fetchFp call back
void fetchFpStreamLastRow(void* param ,TAOS_RES* res, int num) {
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = res;
// get row data set to ltime
tscSetSqlOwner(pSql);
TAOS_ROW row = doSetResultRowData(pSql);
if( row && row[0] ) {
pStream->ltime = *((int64_t*)row[0]);
const char* dstTable = pStream->dstTable? pStream->dstTable: "";
tscDebug(" CQ stream table=%s last row time=%"PRId64" .", dstTable, pStream->ltime);
}
tscClearSqlOwner(pSql);
// no condition call
tscCreateStream(param, pStream->pSql, TSDB_CODE_SUCCESS);
taos_free_result(res);
}
// parsesql async response return and change run thread
void tsParseSqlRet(void* param, TAOS_RES* res, int code) {
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscCreateStreamThread;
schedMsg.ahandle = param;
schedMsg.thandle = res;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
// fp callback
void fpStreamLastRow(void* param ,TAOS_RES* res, int code) {
// check result successful
if (code != TSDB_CODE_SUCCESS) {
tscCreateStream(param, res, TSDB_CODE_SUCCESS);
taos_free_result(res);
return ;
}
// asynchronous fetch last row data
taos_fetch_rows_a(res, fetchFpStreamLastRow, param);
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
void cbParseSql(void* param, TAOS_RES* res, int code) {
// check result successful
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = pStream->pSql;
SSqlCmd* pCmd = &pSql->cmd;
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscDebug("0x%"PRIx64" open stream parse sql failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
pStream->fp(pStream->param, NULL, NULL);
return;
}
// check dstTable valid
if(pStream->dstTable == NULL || strlen(pStream->dstTable) == 0) {
tscDebug(" cbParseSql dstTable is empty.");
tscCreateStream(param, res, code);
return ;
}
// query stream last row time async
char sql[128] = "";
sprintf(sql, "select last_row(*) from %s;", pStream->dstTable);
taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param);
return ;
}
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL;
......@@ -671,6 +686,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
tscSetStreamDestTable(pStream, dstTable);
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
......@@ -685,16 +701,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
pSql->fp = tsParseSqlRet;
pSql->fetchFp = tsParseSqlRet;
pSql->fp = cbParseSql;
pSql->fetchFp = cbParseSql;
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
tscCreateStream(pStream, pSql, code);
cbParseSql(pStream, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscDebug(" cq parseSql IN Process pass. ");
tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr);
} else {
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self);
......@@ -705,6 +721,11 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return pStream;
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback);
}
void taos_close_stream(TAOS_STREAM *handle) {
SSqlStream *pStream = (SSqlStream *)handle;
......
......@@ -437,6 +437,10 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
taosReleaseRef(cqObjRef, (int64_t)param);
}
// inner implement in tscStream.c
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext;
......@@ -449,11 +453,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->tmrId = 0;
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
// TODO the pObj->pStream may be released if error happens
if (pObj->pStream) {
tscSetStreamDestTable(pObj->pStream, pObj->dstTable);
pContext->num++;
cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册