提交 051af216 编写于 作者: weixin_48148422's avatar weixin_48148422

tbase-1422: free resource

上级 f2eac06a
......@@ -465,6 +465,12 @@ void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscFreeSqlCmdData(SSqlCmd *pCmd);
/**
* free query result of the sql object
* @param pObj
*/
void tscFreeSqlResult(SSqlObj* pSql);
/**
* only free part of resources allocated during query.
* Note: this function is multi-thread safe.
......
......@@ -694,7 +694,7 @@ int taos_select_db(TAOS *taos, const char *db) {
return taos_query(taos, sql);
}
void taos_free_result(TAOS_RES *res) {
void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
if (res == NULL) return;
SSqlObj *pSql = (SSqlObj *)res;
......@@ -712,6 +712,8 @@ void taos_free_result(TAOS_RES *res) {
pSql->thandle = NULL;
tscFreeSqlObj(pSql);
tscTrace("%p Async SqlObj is freed by app", pSql);
} else if (keepCmd) {
tscFreeSqlResult(pSql);
} else {
tscFreeSqlObjPartial(pSql);
}
......@@ -761,8 +763,13 @@ void taos_free_result(TAOS_RES *res) {
* Then this object will be reused and no free operation is required.
*/
pSql->thandle = NULL;
tscFreeSqlObjPartial(pSql);
tscTrace("%p sql result is freed by app", pSql);
if (keepCmd) {
tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed by app while sql command is kept", pSql);
} else {
tscFreeSqlObjPartial(pSql);
tscTrace("%p sql result is freed by app", pSql);
}
}
} else {
// if no free resource msg is sent to vnode, we free this object immediately.
......@@ -772,6 +779,9 @@ void taos_free_result(TAOS_RES *res) {
assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL));
tscFreeSqlObj(pSql);
tscTrace("%p Async sql result is freed by app", pSql);
} else if (keepCmd) {
tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed while sql command is kept", pSql);
} else {
tscFreeSqlObjPartial(pSql);
tscTrace("%p sql result is freed", pSql);
......@@ -779,6 +789,10 @@ void taos_free_result(TAOS_RES *res) {
}
}
void taos_free_result(TAOS_RES *res) {
taos_free_result_imp(res, 0);
}
int taos_errno(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
int code;
......
......@@ -25,6 +25,7 @@
#include "ttimer.h"
#include "tutil.h"
#include "tscUtil.h"
#include "tcache.h"
typedef struct SSubscriptionProgress {
int64_t uid;
......@@ -82,7 +83,7 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
else if (p->uid < uid)
s = m + 1;
else {
if (ts >= p->key) p->key = ts + 1;
if (ts >= p->key) p->key = ts;
break;
}
}
......@@ -148,25 +149,27 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) {
TAOS_RES* res = taos_consume(pSub);
if (res != NULL) {
pSub->fp(pSub, res, pSub->param, 0);
// TODO: memory leak
}
taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer);
}
bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
int code = (uint8_t)tsParseSql(pSub->pSql, pObj->acctId, pObj->db, false);
if (code != TSDB_CODE_SUCCESS) {
taos_unsubscribe(pSub);
return false;
tscError("failed to parse sql statement: %s", pSub->topic);
return 0;
}
int numOfMeters = 0;
SSubscriptionProgress* progress = NULL;
// ??? if there's more than one vnode
SSqlCmd* pCmd = &pSub->pSql->cmd;
if (pCmd->command != TSDB_SQL_SELECT) {
tscError("only 'select' statement is allowed in subscription: %s", pSub->topic);
return 0;
}
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
......@@ -177,14 +180,20 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
progress[0].key = tscGetSubscriptionProgress(pSub, uid);
} else {
SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
numOfMeters = pVnodeSidList->numOfSids;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
numOfMeters += pVnodeSidList->numOfSids;
}
progress = calloc(numOfMeters, sizeof(SSubscriptionProgress));
for (int32_t i = 0; i < numOfMeters; ++i) {
SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
int64_t uid = pMeterInfo->uid;
progress[i].uid = uid;
progress[i].key = tscGetSubscriptionProgress(pSub, uid);
numOfMeters = 0;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, j);
int64_t uid = pMeterInfo->uid;
progress[numOfMeters].uid = uid;
progress[numOfMeters++].key = tscGetSubscriptionProgress(pSub, uid);
}
}
qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress);
}
......@@ -193,33 +202,26 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
pSub->numOfMeters = numOfMeters;
pSub->progress = progress;
// timestamp must in the output column
SFieldInfo* pFieldInfo = &pCmd->fieldsInfo;
tscFieldInfoSetValue(pFieldInfo, pFieldInfo->numOfOutputCols, TSDB_DATA_TYPE_TIMESTAMP, "_c0", TSDB_KEYSIZE);
tscSqlExprInsertEmpty(pCmd, pFieldInfo->numOfOutputCols - 1, TSDB_FUNC_PRJ);
tscFieldInfoUpdateVisible(pFieldInfo, pFieldInfo->numOfOutputCols - 1, false);
tscFieldInfoCalOffset(pCmd);
pSub->lastSyncTime = taosGetTimestampMs();
return true;
return 1;
}
static void tscLoadSubscriptionProgress(SSub* pSub) {
static int tscLoadSubscriptionProgress(SSub* pSub) {
char buf[TSDB_MAX_SQL_LEN];
sprintf(buf, "%s/subscribe/%s", dataDir, pSub->topic);
FILE* fp = fopen(buf, "r");
if (fp == NULL) {
tscTrace("subscription progress file does not exist: %s", pSub->topic);
return true;
return 1;
}
if (fgets(buf, sizeof(buf), fp) == NULL) {
tscTrace("invalid subscription progress file: %s", pSub->topic);
fclose(fp);
return false;
return 0;
}
for (int i = 0; i < sizeof(buf); i++) {
......@@ -233,13 +235,13 @@ static void tscLoadSubscriptionProgress(SSub* pSub) {
if (strcmp(buf, pSub->pSql->sqlstr) != 0) {
tscTrace("subscription sql statement mismatch: %s", pSub->topic);
fclose(fp);
return false;
return 0;
}
if (fgets(buf, sizeof(buf), fp) == NULL || atoi(buf) < 0) {
tscTrace("invalid subscription progress file: %s", pSub->topic);
fclose(fp);
return false;
return 0;
}
int numOfMeters = atoi(buf);
......@@ -248,7 +250,7 @@ static void tscLoadSubscriptionProgress(SSub* pSub) {
if (fgets(buf, sizeof(buf), fp) == NULL) {
fclose(fp);
free(progress);
return false;
return 0;
}
int64_t uid, key;
sscanf(buf, "uid=%" SCNd64 ",progress=%" SCNd64, &uid, &key);
......@@ -261,7 +263,8 @@ static void tscLoadSubscriptionProgress(SSub* pSub) {
qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress);
pSub->numOfMeters = numOfMeters;
pSub->progress = progress;
return true;
tscTrace("subscription progress loaded, %d tables: %s", numOfMeters, pSub->topic);
return 1;
}
void tscSaveSubscriptionProgress(void* sub) {
......@@ -291,7 +294,6 @@ void tscSaveSubscriptionProgress(void* sub) {
fclose(fp);
}
TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) {
STscObj* pObj = (STscObj*)taos;
if (pObj == NULL || pObj->signature != pObj) {
......@@ -313,6 +315,7 @@ TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char
}
if (!tscUpdateSubscription(pObj, pSub)) {
taos_unsubscribe(pSub, 1);
return NULL;
}
......@@ -326,39 +329,55 @@ TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char
return pSub;
}
void taos_free_result_imp(SSqlObj* pSql, int keepCmd);
TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSub *pSub = (SSub *)tsub;
if (pSub == NULL) return NULL;
if (taosGetTimestampMs() - pSub->lastSyncTime > 30 * 60 * 1000) {
taos_query(pSub->taos, "reset query cache;");
// TODO: clear memory
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
}
SSqlObj* pSql = pSub->pSql;
SSqlRes *pRes = &pSql->res;
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->qhandle = 0;
pSql->thandle = NULL;
pSql->cmd.command = TSDB_SQL_SELECT;
if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 1000) {
char* sqlstr = pSql->sqlstr;
pSql->sqlstr = NULL;
taos_free_result_imp(pSql, 0);
pSql->sqlstr = sqlstr;
taosClearDataCache(tscCacheHandle);
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
} else {
uint16_t type = pSql->cmd.type;
taos_free_result_imp(pSql, 1);
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->qhandle = 0;
pSql->thandle = NULL;
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->cmd.type = type;
}
tscDoQuery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscRemoveFromSqlList(pSql);
return NULL;
}
return pSql;
}
void taos_unsubscribe(TAOS_SUB *tsub) {
void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
SSub *pSub = (SSub *)tsub;
if (pSub == NULL || pSub->signature != pSub) return;
if (pSub->pTimer != NULL) {
taosTmrStop(pSub->pTimer);
}
if (!keepProgress) {
char path[256];
sprintf(path, "%s/subscribe/%s", dataDir, pSub->topic);
remove(path);
}
tscFreeSqlObj(pSub->pSql);
free(pSub->progress);
memset(pSub, 0, sizeof(*pSub));
......
......@@ -376,6 +376,21 @@ void tscFreeSqlCmdData(SSqlCmd* pCmd) {
}
}
void tscFreeSqlResult(SSqlObj* pSql) {
tfree(pSql->res.pRsp);
pSql->res.row = 0;
pSql->res.numOfRows = 0;
pSql->res.numOfTotal = 0;
pSql->res.numOfGroups = 0;
tfree(pSql->res.pGroupRec);
tscDestroyLocalReducer(pSql);
tscDestroyResPointerInfo(&pSql->res);
tfree(pSql->res.pColumnIndex);
}
void tscFreeSqlObjPartial(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
......@@ -399,20 +414,9 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
tfree(pSql->sqlstr);
pthread_mutex_unlock(&pObj->mutex);
tfree(pSql->res.pRsp);
pSql->res.row = 0;
pSql->res.numOfRows = 0;
pSql->res.numOfTotal = 0;
pSql->res.numOfGroups = 0;
tfree(pSql->res.pGroupRec);
tscDestroyLocalReducer(pSql);
tscFreeSqlResult(pSql);
tfree(pSql->pSubs);
pSql->numOfSubs = 0;
tscDestroyResPointerInfo(pRes);
tfree(pSql->res.pColumnIndex);
tscFreeSqlCmdData(pCmd);
tscRemoveAllMeterMetaInfo(pCmd, false);
......
......@@ -119,7 +119,7 @@ DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval);
DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
......
......@@ -29,7 +29,8 @@ int main(int argc, char *argv[]) {
const char* user = "root";
const char* passwd = "taosdata";
const char* sql = "select * from meters;";
int async = 1, restart = 0;
const char* topic = "test-multiple";
int async = 1, restart = 0, keep = 1;
TAOS_SUB* tsub = NULL;
for (int i = 1; i < argc; i++) {
......@@ -55,6 +56,11 @@ int main(int argc, char *argv[]) {
}
if (strcmp(argv[i], "-single") == 0) {
sql = "select * from t0;";
topic = "test-single";
continue;
}
if (strcmp(argv[i], "-nokeep") == 0) {
keep = 0;
continue;
}
}
......@@ -69,9 +75,9 @@ int main(int argc, char *argv[]) {
}
if (async) {
tsub = taos_subscribe("test", restart, taos, sql, subscribe_callback, NULL, 1000);
tsub = taos_subscribe(topic, restart, taos, sql, subscribe_callback, NULL, 1000);
} else {
tsub = taos_subscribe("test", restart, taos, sql, NULL, NULL, 0);
tsub = taos_subscribe(topic, restart, taos, sql, NULL, NULL, 0);
}
if (tsub == NULL) {
......@@ -87,7 +93,7 @@ int main(int argc, char *argv[]) {
getchar();
}
taos_unsubscribe(tsub);
taos_unsubscribe(tsub, keep);
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册