diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index e91b05d104e10e43cf1430c45a55a5e648f2de92..da91dc5649123e87b4ff7ead405aaceb6f9ee675 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -465,6 +465,12 @@ void tscDestroyResPointerInfo(SSqlRes *pRes); void tscFreeSqlCmdData(SSqlCmd *pCmd); +/** + * free query result of the sql object + * @param pObj + */ +void tscFreeSqlResult(SSqlObj* pSql); + /** * only free part of resources allocated during query. * Note: this function is multi-thread safe. diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 9c812d4187394505d0f745e016339037e7d02bc5..ad4a29f20e62fbb6eff1b146aff6b18c4e725ab1 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -694,7 +694,7 @@ int taos_select_db(TAOS *taos, const char *db) { return taos_query(taos, sql); } -void taos_free_result(TAOS_RES *res) { +void taos_free_result_imp(TAOS_RES* res, int keepCmd) { if (res == NULL) return; SSqlObj *pSql = (SSqlObj *)res; @@ -712,6 +712,8 @@ void taos_free_result(TAOS_RES *res) { pSql->thandle = NULL; tscFreeSqlObj(pSql); tscTrace("%p Async SqlObj is freed by app", pSql); + } else if (keepCmd) { + tscFreeSqlResult(pSql); } else { tscFreeSqlObjPartial(pSql); } @@ -761,8 +763,13 @@ void taos_free_result(TAOS_RES *res) { * Then this object will be reused and no free operation is required. */ pSql->thandle = NULL; - tscFreeSqlObjPartial(pSql); - tscTrace("%p sql result is freed by app", pSql); + if (keepCmd) { + tscFreeSqlResult(pSql); + tscTrace("%p sql result is freed by app while sql command is kept", pSql); + } else { + tscFreeSqlObjPartial(pSql); + tscTrace("%p sql result is freed by app", pSql); + } } } else { // if no free resource msg is sent to vnode, we free this object immediately. @@ -772,6 +779,9 @@ void taos_free_result(TAOS_RES *res) { assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); tscFreeSqlObj(pSql); tscTrace("%p Async sql result is freed by app", pSql); + } else if (keepCmd) { + tscFreeSqlResult(pSql); + tscTrace("%p sql result is freed while sql command is kept", pSql); } else { tscFreeSqlObjPartial(pSql); tscTrace("%p sql result is freed", pSql); @@ -779,6 +789,10 @@ void taos_free_result(TAOS_RES *res) { } } +void taos_free_result(TAOS_RES *res) { + taos_free_result_imp(res, 0); +} + int taos_errno(TAOS *taos) { STscObj *pObj = (STscObj *)taos; int code; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 910f9412f295c60f24979abb3f0709c59e035df1..b2c332deff60712bdcd1c47e74b19b5f65278380 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -25,6 +25,7 @@ #include "ttimer.h" #include "tutil.h" #include "tscUtil.h" +#include "tcache.h" typedef struct SSubscriptionProgress { int64_t uid; @@ -82,7 +83,7 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { else if (p->uid < uid) s = m + 1; else { - if (ts >= p->key) p->key = ts + 1; + if (ts >= p->key) p->key = ts; break; } } @@ -148,25 +149,27 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { TAOS_RES* res = taos_consume(pSub); if (res != NULL) { pSub->fp(pSub, res, pSub->param, 0); - // TODO: memory leak } taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); } -bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) { +int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { int code = (uint8_t)tsParseSql(pSub->pSql, pObj->acctId, pObj->db, false); if (code != TSDB_CODE_SUCCESS) { - taos_unsubscribe(pSub); - return false; + tscError("failed to parse sql statement: %s", pSub->topic); + return 0; } int numOfMeters = 0; SSubscriptionProgress* progress = NULL; -// ??? if there's more than one vnode SSqlCmd* pCmd = &pSub->pSql->cmd; + if (pCmd->command != TSDB_SQL_SELECT) { + tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); + return 0; + } SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { @@ -177,14 +180,20 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) { progress[0].key = tscGetSubscriptionProgress(pSub, uid); } else { SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); - numOfMeters = pVnodeSidList->numOfSids; + for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); + numOfMeters += pVnodeSidList->numOfSids; + } progress = calloc(numOfMeters, sizeof(SSubscriptionProgress)); - for (int32_t i = 0; i < numOfMeters; ++i) { - SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i); - int64_t uid = pMeterInfo->uid; - progress[i].uid = uid; - progress[i].key = tscGetSubscriptionProgress(pSub, uid); + numOfMeters = 0; + for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); + for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) { + SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, j); + int64_t uid = pMeterInfo->uid; + progress[numOfMeters].uid = uid; + progress[numOfMeters++].key = tscGetSubscriptionProgress(pSub, uid); + } } qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); } @@ -193,33 +202,26 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) { pSub->numOfMeters = numOfMeters; pSub->progress = progress; - // timestamp must in the output column - SFieldInfo* pFieldInfo = &pCmd->fieldsInfo; - tscFieldInfoSetValue(pFieldInfo, pFieldInfo->numOfOutputCols, TSDB_DATA_TYPE_TIMESTAMP, "_c0", TSDB_KEYSIZE); - tscSqlExprInsertEmpty(pCmd, pFieldInfo->numOfOutputCols - 1, TSDB_FUNC_PRJ); - tscFieldInfoUpdateVisible(pFieldInfo, pFieldInfo->numOfOutputCols - 1, false); - tscFieldInfoCalOffset(pCmd); - pSub->lastSyncTime = taosGetTimestampMs(); - return true; + return 1; } -static void tscLoadSubscriptionProgress(SSub* pSub) { +static int tscLoadSubscriptionProgress(SSub* pSub) { char buf[TSDB_MAX_SQL_LEN]; sprintf(buf, "%s/subscribe/%s", dataDir, pSub->topic); FILE* fp = fopen(buf, "r"); if (fp == NULL) { tscTrace("subscription progress file does not exist: %s", pSub->topic); - return true; + return 1; } if (fgets(buf, sizeof(buf), fp) == NULL) { tscTrace("invalid subscription progress file: %s", pSub->topic); fclose(fp); - return false; + return 0; } for (int i = 0; i < sizeof(buf); i++) { @@ -233,13 +235,13 @@ static void tscLoadSubscriptionProgress(SSub* pSub) { if (strcmp(buf, pSub->pSql->sqlstr) != 0) { tscTrace("subscription sql statement mismatch: %s", pSub->topic); fclose(fp); - return false; + return 0; } if (fgets(buf, sizeof(buf), fp) == NULL || atoi(buf) < 0) { tscTrace("invalid subscription progress file: %s", pSub->topic); fclose(fp); - return false; + return 0; } int numOfMeters = atoi(buf); @@ -248,7 +250,7 @@ static void tscLoadSubscriptionProgress(SSub* pSub) { if (fgets(buf, sizeof(buf), fp) == NULL) { fclose(fp); free(progress); - return false; + return 0; } int64_t uid, key; sscanf(buf, "uid=%" SCNd64 ",progress=%" SCNd64, &uid, &key); @@ -261,7 +263,8 @@ static void tscLoadSubscriptionProgress(SSub* pSub) { qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); pSub->numOfMeters = numOfMeters; pSub->progress = progress; - return true; + tscTrace("subscription progress loaded, %d tables: %s", numOfMeters, pSub->topic); + return 1; } void tscSaveSubscriptionProgress(void* sub) { @@ -291,7 +294,6 @@ void tscSaveSubscriptionProgress(void* sub) { fclose(fp); } - TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { STscObj* pObj = (STscObj*)taos; if (pObj == NULL || pObj->signature != pObj) { @@ -313,6 +315,7 @@ TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char } if (!tscUpdateSubscription(pObj, pSub)) { + taos_unsubscribe(pSub, 1); return NULL; } @@ -326,39 +329,55 @@ TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char return pSub; } +void taos_free_result_imp(SSqlObj* pSql, int keepCmd); + TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; if (pSub == NULL) return NULL; - if (taosGetTimestampMs() - pSub->lastSyncTime > 30 * 60 * 1000) { - taos_query(pSub->taos, "reset query cache;"); - // TODO: clear memory - if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; - } - SSqlObj* pSql = pSub->pSql; SSqlRes *pRes = &pSql->res; - pRes->numOfRows = 1; - pRes->numOfTotal = 0; - pRes->qhandle = 0; - pSql->thandle = NULL; - pSql->cmd.command = TSDB_SQL_SELECT; + if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 1000) { + char* sqlstr = pSql->sqlstr; + pSql->sqlstr = NULL; + taos_free_result_imp(pSql, 0); + pSql->sqlstr = sqlstr; + taosClearDataCache(tscCacheHandle); + if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; + } else { + uint16_t type = pSql->cmd.type; + taos_free_result_imp(pSql, 1); + pRes->numOfRows = 1; + pRes->numOfTotal = 0; + pRes->qhandle = 0; + pSql->thandle = NULL; + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->cmd.type = type; + } tscDoQuery(pSql); if (pRes->code != TSDB_CODE_SUCCESS) { + tscRemoveFromSqlList(pSql); return NULL; } return pSql; } -void taos_unsubscribe(TAOS_SUB *tsub) { +void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { SSub *pSub = (SSub *)tsub; if (pSub == NULL || pSub->signature != pSub) return; if (pSub->pTimer != NULL) { taosTmrStop(pSub->pTimer); } + + if (!keepProgress) { + char path[256]; + sprintf(path, "%s/subscribe/%s", dataDir, pSub->topic); + remove(path); + } + tscFreeSqlObj(pSub->pSql); free(pSub->progress); memset(pSub, 0, sizeof(*pSub)); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 699c055249967fcdc79097e882b16134b1b27155..f8a25828f5b3922357ab473906d22fdde8a77a1c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -376,6 +376,21 @@ void tscFreeSqlCmdData(SSqlCmd* pCmd) { } } +void tscFreeSqlResult(SSqlObj* pSql) { + tfree(pSql->res.pRsp); + pSql->res.row = 0; + pSql->res.numOfRows = 0; + pSql->res.numOfTotal = 0; + + pSql->res.numOfGroups = 0; + tfree(pSql->res.pGroupRec); + + tscDestroyLocalReducer(pSql); + + tscDestroyResPointerInfo(&pSql->res); + tfree(pSql->res.pColumnIndex); +} + void tscFreeSqlObjPartial(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; @@ -399,20 +414,9 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { tfree(pSql->sqlstr); pthread_mutex_unlock(&pObj->mutex); - tfree(pSql->res.pRsp); - pSql->res.row = 0; - pSql->res.numOfRows = 0; - pSql->res.numOfTotal = 0; - - pSql->res.numOfGroups = 0; - tfree(pSql->res.pGroupRec); - - tscDestroyLocalReducer(pSql); - + tscFreeSqlResult(pSql); tfree(pSql->pSubs); pSql->numOfSubs = 0; - tscDestroyResPointerInfo(pRes); - tfree(pSql->res.pColumnIndex); tscFreeSqlCmdData(pCmd); tscRemoveAllMeterMetaInfo(pCmd, false); diff --git a/src/inc/taos.h b/src/inc/taos.h index bdba644bb69b2204d43db3edcdbbaa9a022d58f2..3574ec5b30cb38da61235f3f97db48d458901934 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -119,7 +119,7 @@ DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); DLL_EXPORT TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); -DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub); +DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)); diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index c81246bac6c714c6865616a2ba1ee8cc7055f82d..bcec3e61ec401641dff0167c98e41627865a89c8 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -29,7 +29,8 @@ int main(int argc, char *argv[]) { const char* user = "root"; const char* passwd = "taosdata"; const char* sql = "select * from meters;"; - int async = 1, restart = 0; + const char* topic = "test-multiple"; + int async = 1, restart = 0, keep = 1; TAOS_SUB* tsub = NULL; for (int i = 1; i < argc; i++) { @@ -55,6 +56,11 @@ int main(int argc, char *argv[]) { } if (strcmp(argv[i], "-single") == 0) { sql = "select * from t0;"; + topic = "test-single"; + continue; + } + if (strcmp(argv[i], "-nokeep") == 0) { + keep = 0; continue; } } @@ -69,9 +75,9 @@ int main(int argc, char *argv[]) { } if (async) { - tsub = taos_subscribe("test", restart, taos, sql, subscribe_callback, NULL, 1000); + tsub = taos_subscribe(topic, restart, taos, sql, subscribe_callback, NULL, 1000); } else { - tsub = taos_subscribe("test", restart, taos, sql, NULL, NULL, 0); + tsub = taos_subscribe(topic, restart, taos, sql, NULL, NULL, 0); } if (tsub == NULL) { @@ -87,7 +93,7 @@ int main(int argc, char *argv[]) { getchar(); } - taos_unsubscribe(tsub); + taos_unsubscribe(tsub, keep); return 0; }