From 3537de849a8056d021655528b4469d2a1ecb5ea2 Mon Sep 17 00:00:00 2001 From: localvar Date: Fri, 27 Dec 2019 17:32:53 +0800 Subject: [PATCH] update query process --- src/client/src/tscServer.c | 3 ++- src/client/src/tscSub.c | 16 ++-------------- src/inc/taos.h | 2 +- src/system/detail/src/vnodeQueryProcess.c | 9 +++++++++ 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1aa56169df..ca96103593 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -46,6 +46,8 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf); void tscProcessActivityTimer(void *handle, void *tmrId); int tscKeepConn[TSDB_SQL_MAX] = {0}; +TSKEY tscGetSubscriptionProgress(SSqlObj* pSql, int64_t uid); +void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts); static int32_t minMsgSize() { return tsRpcHeadSize + sizeof(STaosDigest); } @@ -3526,7 +3528,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) { return 0; } -void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts); int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index a55e889c1c..5a2ae4ee4e 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -144,8 +144,8 @@ static void tscProcessSubscribeTimer(void *handle, void *tmrId) { TAOS_RES* res = taos_consume(pSub); if (res != NULL) { - pSub->fp(pSub->param, res, 0); - taos_free_result(res); + pSub->fp(pSub, res, pSub->param, 0); + // TODO: memory leak } taosTmrReset(tscProcessSubscribeTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); @@ -240,15 +240,3 @@ void taos_unsubscribe(TAOS_SUB *tsub) { memset(pSub, 0, sizeof(*pSub)); free(pSub); } - -int taos_subfields_count(TAOS_SUB *tsub) { - SSub *pSub = (SSub *)tsub; - - return taos_num_fields(pSub->pSql); -} - -TAOS_FIELD *taos_fetch_subfields(TAOS_SUB *tsub) { - SSub *pSub = (SSub *)tsub; - - return pSub->pSql->cmd.fieldsInfo.pFields; -} diff --git a/src/inc/taos.h b/src/inc/taos.h index e6eac00a7c..aa2ed02f9b 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -116,7 +116,7 @@ DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param); DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param); -typedef void (*TAOS_SUBSCRIBE_CALLBACK)(void *param, TAOS_RES *res, int code); +typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub); diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 2e59789b27..ae73ba5249 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -682,10 +682,19 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; +pQInfo->killed = 0; +/* if (isQueryKilled(pQuery)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } +*/ + + TSKEY skey = pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[k]->key; + if (skey > 0) { + pQuery->skey = skey; + // pQuery->lastKey = ???; + } bool dataInDisk = true; bool dataInCache = true; -- GitLab