提交 3537de84 编写于 作者: weixin_48148422's avatar weixin_48148422

update query process

上级 804b5812
...@@ -46,6 +46,8 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); ...@@ -46,6 +46,8 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf); void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf);
void tscProcessActivityTimer(void *handle, void *tmrId); void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0}; int tscKeepConn[TSDB_SQL_MAX] = {0};
TSKEY tscGetSubscriptionProgress(SSqlObj* pSql, int64_t uid);
void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts);
static int32_t minMsgSize() { return tsRpcHeadSize + sizeof(STaosDigest); } static int32_t minMsgSize() { return tsRpcHeadSize + sizeof(STaosDigest); }
...@@ -3526,7 +3528,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) { ...@@ -3526,7 +3528,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
return 0; return 0;
} }
void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts);
int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
......
...@@ -144,8 +144,8 @@ static void tscProcessSubscribeTimer(void *handle, void *tmrId) { ...@@ -144,8 +144,8 @@ static void tscProcessSubscribeTimer(void *handle, void *tmrId) {
TAOS_RES* res = taos_consume(pSub); TAOS_RES* res = taos_consume(pSub);
if (res != NULL) { if (res != NULL) {
pSub->fp(pSub->param, res, 0); pSub->fp(pSub, res, pSub->param, 0);
taos_free_result(res); // TODO: memory leak
} }
taosTmrReset(tscProcessSubscribeTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); taosTmrReset(tscProcessSubscribeTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer);
...@@ -240,15 +240,3 @@ void taos_unsubscribe(TAOS_SUB *tsub) { ...@@ -240,15 +240,3 @@ void taos_unsubscribe(TAOS_SUB *tsub) {
memset(pSub, 0, sizeof(*pSub)); memset(pSub, 0, sizeof(*pSub));
free(pSub); free(pSub);
} }
int taos_subfields_count(TAOS_SUB *tsub) {
SSub *pSub = (SSub *)tsub;
return taos_num_fields(pSub->pSql);
}
TAOS_FIELD *taos_fetch_subfields(TAOS_SUB *tsub) {
SSub *pSub = (SSub *)tsub;
return pSub->pSql->cmd.fieldsInfo.pFields;
}
...@@ -116,7 +116,7 @@ DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param ...@@ -116,7 +116,7 @@ DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param); DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param);
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(void *param, TAOS_RES *res, int code); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval);
DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub);
......
...@@ -682,10 +682,19 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { ...@@ -682,10 +682,19 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
while (pSupporter->meterIdx < pSupporter->numOfMeters) { while (pSupporter->meterIdx < pSupporter->numOfMeters) {
int32_t k = pSupporter->meterIdx; int32_t k = pSupporter->meterIdx;
pQInfo->killed = 0;
/*
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return; return;
} }
*/
TSKEY skey = pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[k]->key;
if (skey > 0) {
pQuery->skey = skey;
// pQuery->lastKey = ???;
}
bool dataInDisk = true; bool dataInDisk = true;
bool dataInCache = true; bool dataInCache = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册