diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 11b7815586bcdae37f7315e40e547857bb898e5f..74a0e8c11cdfe70584ec99a4d1061acc0c1eeac3 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -334,6 +334,7 @@ typedef struct STscObj { struct SSqlStream *streamList; void* pDnodeConn; pthread_mutex_t mutex; + T_REF_DECLARE(); } STscObj; typedef struct SSqlObj { @@ -503,7 +504,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField } } -extern SCacheObj* tscCacheHandle; +extern SCacheObj* tscMetaCache; extern SCacheObj* tscObjCache; extern void * tscTmr; extern void * tscQhandle; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 5e9aa1b1f80689fd7a52fd476dbf75d2f0a70811..09610575f619be3f02059b6e6ca1ba2c56edcb62 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -51,6 +51,11 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->fp = fp; pSql->fetchFp = fp; + uint64_t handle = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); + + T_REF_INC(pSql->pTscObj); + pSql->sqlstr = calloc(1, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); @@ -64,9 +69,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); pCmd->curSql = pSql->sqlstr; - uint64_t handle = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); - int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 4b6174e13d08116bf245e8ed870659428a07735a..50f82ae662840a80b717cc99934640626a679d1f 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -429,7 +429,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) { pRes->qhandle = 0x1; pRes->numOfRows = 0; } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { - taosCacheEmpty(tscCacheHandle); + taosCacheEmpty(tscMetaCache); pRes->code = TSDB_CODE_SUCCESS; } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { pRes->code = tscProcessServerVer(pSql); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index de00827c3a8b6853b9bcdbf60f71f2d2f5805de9..e8f1a2d758ef72c95506e0d8673a8f5bb950bcc1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1181,13 +1181,14 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t } END_TRY len = tbufTell(&bw); - char* c = tbufGetData(&bw, true); + char* c = tbufGetData(&bw, false); // set the serialized binary string as the parameter of arithmetic expression addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, (int32_t)len, index.tableIndex); insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr); + tbufCloseWriter(&bw); taosArrayDestroy(colList); tExprTreeDestroy(&pNode, NULL); } else { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 26fe19d8be5d84b2b3430bc53bbc2cdcf50f7e32..1b6a77bdc737980e2cb7eccc777a9ebe82f78b69 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -373,17 +373,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); } - bool shouldFree = tscShouldBeFreed(pSql);; + bool shouldFree = tscShouldBeFreed(pSql); if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code; (*pSql->fp)(pSql->param, pSql, rpcMsg->code); } void** p1 = p; - taosCacheRelease(tscObjCache, (void**) &p, false); + taosCacheRelease(tscObjCache, (void**) &p1, false); - if (shouldFree) { - taosCacheRelease(tscObjCache, (void **)&p1, true); + if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it + taosCacheRelease(tscObjCache, (void **)&p, true); tscDebug("%p sqlObj is automatically freed", pSql); } @@ -1718,7 +1718,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); assert(pTableMetaInfo->pTableMeta == NULL); - pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, + pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000); // todo handle out of memory case @@ -1830,7 +1830,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { // int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache // // pMeta->index = 0; - // (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer); + // (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer); // } } @@ -1917,12 +1917,14 @@ int tscProcessShowRsp(SSqlObj *pSql) { key[0] = pCmd->msgType + 'a'; strcpy(key + 1, "showlist"); - taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false); - + if (pTableMetaInfo->pTableMeta != NULL) { + taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false); + } + size_t size = 0; STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); - pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size, + pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size, tsTableMetaKeepTimer * 1000); SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); @@ -1981,6 +1983,8 @@ static void createHBObj(STscObj* pObj) { pSql->pTscObj = pObj; pSql->signature = pSql; pObj->pHb = pSql; + T_REF_INC(pObj); + tscAddSubqueryInfo(&pObj->pHb->cmd); tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); @@ -2025,14 +2029,14 @@ int tscProcessUseDbRsp(SSqlObj *pSql) { int tscProcessDropDbRsp(SSqlObj *pSql) { pSql->pTscObj->db[0] = 0; - taosCacheEmpty(tscCacheHandle); + taosCacheEmpty(tscMetaCache); return 0; } int tscProcessDropTableRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); + STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); if (pTableMeta == NULL) { /* not in cache, abort */ return 0; } @@ -2045,10 +2049,10 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { * instead. */ tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name); - taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true); + taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true); if (pTableMetaInfo->pTableMeta) { - taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); + taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true); } return 0; @@ -2057,21 +2061,21 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); + STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); if (pTableMeta == NULL) { /* not in cache, abort */ return 0; } tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name); - taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true); + taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true); if (pTableMetaInfo->pTableMeta) { bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); + taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true); if (isSuperTable) { // if it is a super table, reset whole query cache tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name); - taosCacheEmpty(tscCacheHandle); + taosCacheEmpty(tscMetaCache); } } @@ -2156,6 +2160,12 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf pNew->signature = pNew; pNew->cmd.command = TSDB_SQL_META; + T_REF_INC(pNew->pTscObj); + + // TODO add test case on x86 platform + uint64_t adr = (uint64_t) pNew; + pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000); + tscAddSubqueryInfo(&pNew->cmd); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0); @@ -2179,10 +2189,6 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf pNew->fp = tscTableMetaCallBack; pNew->param = pSql; - // TODO add test case on x86 platform - uint64_t adr = (uint64_t) pNew; - pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000); - int32_t code = tscProcessSql(pNew); if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated @@ -2196,10 +2202,10 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { // If this STableMetaInfo owns a table meta, release it first if (pTableMetaInfo->pTableMeta != NULL) { - taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false); + taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false); } - pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); + pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); if (pTableMetaInfo->pTableMeta != NULL) { STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns, @@ -2234,7 +2240,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta); } - taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); + taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true); return getTableMetaFromMgmt(pSql, pTableMetaInfo); } @@ -2264,7 +2270,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { pNew->signature = pNew; pNew->cmd.command = TSDB_SQL_STABLEVGROUP; - + + // TODO TEST IT SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0); if (pNewQueryInfo == NULL) { tscFreeSqlObj(pNew); @@ -2274,7 +2281,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i); - STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta); + STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta); tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList); } @@ -2284,6 +2291,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { } pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; + T_REF_INC(pNew->pTscObj); uint64_t p = (uint64_t) pNew; pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 58859e944cc7e367da9ed924d8755d15b9a8c2bb..025f0a2b142486eb9cccde32cba6ec8dba2edadd 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -102,6 +102,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con pObj->signature = pObj; pObj->pDnodeConn = pDnodeConn; + T_REF_INIT_VAL(pObj, 1); tstrncpy(pObj->user, user, sizeof(pObj->user)); secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass)); @@ -155,6 +156,8 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con *taos = pObj; } + T_REF_INC(pSql->pTscObj); + uint64_t key = (uint64_t) pSql; pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); @@ -261,6 +264,31 @@ void taos_close(TAOS *taos) { tscFreeSqlObj(pObj->pHb); } + // free all sqlObjs created by using this connect before free the STscObj + while(1) { + pthread_mutex_lock(&pObj->mutex); + void* p = pObj->sqlList; + pthread_mutex_unlock(&pObj->mutex); + + if (p == NULL) { + break; + } + + tscDebug("%p waiting for sqlObj to be freed, %p", pObj, p); + taosMsleep(100); + + // todo fix me!! two threads call taos_free_result will cause problem. + tscDebug("%p free :%p", pObj, p); + taos_free_result(p); + } + + int32_t ref = T_REF_DEC(pObj); + assert(ref >= 0); + + if (ref > 0) { + return; + } + tscCloseTscObj(pObj); } @@ -537,7 +565,7 @@ int taos_select_db(TAOS *taos, const char *db) { } // send free message to vnode to free qhandle and corresponding resources in vnode -static bool tscKillQueryInDnode(SSqlObj* pSql) { +static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; @@ -561,7 +589,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { cmd == TSDB_SQL_FETCH)) { pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s, ", pSql, sqlCmd[pCmd->command]); + tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]); tscProcessSql(pSql); return false; @@ -577,45 +605,15 @@ void taos_free_result(TAOS_RES *res) { return; } - assert(pSql->self != 0 && *pSql->self == pSql); +// assert(pSql->self != 0 && *pSql->self == pSql); bool freeNow = tscKillQueryInDnode(pSql); if (freeNow) { tscDebug("%p free sqlObj in cache", pSql); - taosCacheRelease(tscObjCache, (void**) &pSql->self, true); - } -} - -//static void doFreeResult(TAOS_RES *res) { -// SSqlObj *pSql = (SSqlObj *)res; -// -// if (pSql == NULL || pSql->signature != pSql) { -// tscDebug("%p sqlObj has been freed", pSql); -// return; -// } -// -// // The semaphore can not be changed while freeing async sub query objects. -// SSqlRes *pRes = &pSql->res; -// if (pRes == NULL || pRes->qhandle == 0) { -// tscFreeSqlObj(pSql); -// tscDebug("%p SqlObj is freed by app, qhandle is null", pSql); -// return; -// } -// -// // set freeFlag to 1 in retrieve message if there are un-retrieved results data in node -// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); -// if (pQueryInfo == NULL) { -// tscFreeSqlObj(pSql); -// tscDebug("%p SqlObj is freed by app", pSql); -// return; -// } -// -// pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; -// if (!tscKillQueryInDnode(pSql)) { -// tscFreeSqlObj(pSql); -// tscDebug("%p sqlObj is freed by app", pSql); -// } -//} + SSqlObj** p = pSql->self; + taosCacheRelease(tscObjCache, (void**) &p, true); + } +} int taos_errno(TAOS_RES *tres) { SSqlObj *pSql = (SSqlObj *) tres; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 79e00110932682d60cd6f4fe561fc4eb41cc18f1..71ba76dc6cb558453a92aa5c2d0df0905fa0a43a 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -167,7 +167,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf retryDelay); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); - taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), true); + taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), true); taosTFree(pTableMetaInfo->vgroupList); tscSetRetryTimer(pStream, pStream->pSql, retryDelay); @@ -275,7 +275,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf // release the metric/meter meta information reference, so data in cache can be updated - taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); + taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false); tscFreeSqlResult(pSql); taosTFree(pSql->pSubs); pSql->numOfSubs = 0; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 2c7fcf05c9fa586247b78a71a9715b93453f414d..82ce1d36791e2a5e79177ea1405bf90145f2776d 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -30,7 +30,7 @@ #include "tlocale.h" // global, not configurable -SCacheObj* tscCacheHandle; +SCacheObj* tscMetaCache; SCacheObj* tscObjCache; void * tscTmr; void * tscQhandle; @@ -145,9 +145,9 @@ void taos_init_imp(void) { refreshTime = refreshTime > 10 ? 10 : refreshTime; refreshTime = refreshTime < 10 ? 10 : refreshTime; - if (tscCacheHandle == NULL) { - tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); - tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime, false, tscFreeSqlObjInCache, "sqlObjHandle"); + if (tscMetaCache == NULL) { + tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); + tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj"); } tscDebug("client is initialized successfully"); @@ -156,9 +156,9 @@ void taos_init_imp(void) { void taos_init() { pthread_once(&tscinit, taos_init_imp); } void taos_cleanup() { - if (tscCacheHandle != NULL) { - taosCacheCleanup(tscCacheHandle); - tscCacheHandle = NULL; + if (tscMetaCache != NULL) { + taosCacheCleanup(tscMetaCache); + tscMetaCache = NULL; taosCacheCleanup(tscObjCache); tscObjCache = NULL; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7d4369816a9fcc604adb9427796550fbccf9db2b..7f2c1716a6f8f2a1457d3deaed30740d286a86be 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -344,8 +344,6 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { } SSqlCmd* pCmd = &pSql->cmd; - STscObj* pObj = pSql->pTscObj; - int32_t cmd = pCmd->command; if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) { @@ -353,11 +351,11 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { } // pSql->sqlstr will be used by tscBuildQueryStreamDesc - if (pObj->signature == pObj) { +// if (pObj->signature == pObj) { //pthread_mutex_lock(&pObj->mutex); taosTFree(pSql->sqlstr); //pthread_mutex_unlock(&pObj->mutex); - } +// } tscFreeSqlResult(pSql); @@ -384,42 +382,61 @@ static void tscFreeSubobj(SSqlObj* pSql) { pSql->numOfSubs = 0; } +//static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) { +// SSqlCmd* pCmd = &pSql->cmd; +// SSqlRes* pRes = &pSql->res; +// +// if (pRes == NULL || pRes->qhandle == 0) { +// return true; +// } +// +// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); +// if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) { +// return true; +// } +// +// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); +// tscRemoveFromSqlList(pSql); +// +// int32_t cmd = pCmd->command; +// if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && pSql->pStream == NULL && (pTableMetaInfo->pTableMeta != NULL) && +// (cmd == TSDB_SQL_SELECT || +// cmd == TSDB_SQL_SHOW || +// cmd == TSDB_SQL_RETRIEVE || +// cmd == TSDB_SQL_FETCH)) { +// pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; +// pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; +// tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s, ", pSql, sqlCmd[pCmd->command]); +// +// tscProcessSql(pSql); +// return false; +// } +// +// return true; +//} + +/** + * The free operation will cause the pSql to be removed from hash table and free it in + * the function of processmsgfromserver is impossible in this case, since it will fail + * to retrieve pSqlObj in hashtable. + * + * @param pSql + */ void tscFreeSqlObjInCache(void *pSql) { assert(pSql != NULL); - SSqlObj** p = (SSqlObj**) pSql; + SSqlObj** p = (SSqlObj**)pSql; + assert((*p)->self != 0 && (*p)->self == (p)); tscFreeSqlObj(*p); } -static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) { - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { - return false; - } - - int32_t cmd = pCmd->command; - if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && pSql->pStream == NULL && (pTableMetaInfo->pTableMeta != NULL) && - (cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_SHOW || cmd == TSDB_SQL_RETRIEVE || cmd == TSDB_SQL_FETCH)) { - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]); - tscProcessSql(pSql); - return true; - } - - return false; -} - void tscFreeSqlObj(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; } tscDebug("%p start to free sqlObj", pSql); + STscObj* pTscObj = pSql->pTscObj; tscFreeSubobj(pSql); tscPartiallyFreeSqlObj(pSql); @@ -438,6 +455,13 @@ void tscFreeSqlObj(SSqlObj* pSql) { free(pSql); tscDebug("%p free sqlObj completed", pSql); + + int32_t ref = T_REF_DEC(pTscObj); + assert(ref >= 0); + + if (ref == 0) { + tscCloseTscObj(pTscObj); + } } void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { @@ -450,7 +474,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { // free the refcount for metermeta if (pDataBlock->pTableMeta != NULL) { - taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pTableMeta), false); + taosCacheRelease(tscMetaCache, (void**)&(pDataBlock->pTableMeta), false); } taosTFree(pDataBlock); @@ -509,10 +533,10 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name)); if (pTableMetaInfo->pTableMeta != NULL) { - taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); + taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false); } - pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pTableMeta); + pTableMetaInfo->pTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pDataBlock->pTableMeta); } else { assert(strncmp(pTableMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0); } @@ -583,7 +607,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff * due to operation such as drop database. So here we add the reference count directly instead of invoke * taosGetDataFromCache, which may return NULL value. */ - dataBuf->pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMeta); + dataBuf->pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMeta); assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL); *dataBlocks = dataBuf; @@ -777,28 +801,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { // TODO: all subqueries should be freed correctly before close this connection. void tscCloseTscObj(STscObj* pObj) { assert(pObj != NULL); - + pObj->signature = NULL; taosTmrStopA(&(pObj->pTimer)); - // wait for all sqlObjs created according to this connect closed - while(1) { - pthread_mutex_lock(&pObj->mutex); - void* p = pObj->sqlList; - pthread_mutex_unlock(&pObj->mutex); - - if (p == NULL) { - break; - } - - tscDebug("%p waiting for sqlObj to be freed, %p", pObj, p); - taosMsleep(100); - - // todo fix me!! two threads call taos_free_result will cause problem. - tscDebug("%p free :%p", pObj, p); - taos_free_result(p); - } - if (pObj->pDnodeConn != NULL) { rpcClose(pObj->pDnodeConn); pObj->pDnodeConn = NULL; @@ -1743,7 +1749,7 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) } if (pTableMetaInfo->pTableMeta != NULL) { - taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); + taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); } taosTFree(pTableMetaInfo->vgroupList); @@ -1769,6 +1775,8 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm } pNew->pTscObj = pSql->pTscObj; + T_REF_INC(pNew->pTscObj); + pNew->signature = pNew; SSqlCmd* pCmd = &pNew->cmd; @@ -1800,6 +1808,8 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL); + T_REF_INC(pNew->pTscObj); + uint64_t p = (uint64_t) pNew; pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); return pNew; @@ -1890,6 +1900,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; + T_REF_INC(pNew->pTscObj); pNew->sqlstr = strdup(pSql->sqlstr); if (pNew->sqlstr == NULL) { @@ -1994,14 +2005,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void STableMetaInfo* pFinalInfo = NULL; if (pPrevSql == NULL) { - STableMeta* pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup + STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup assert(pTableMeta != NULL); pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList); } else { // transfer the ownership of pTableMeta to the newly create sql object. STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); - STableMeta* pPrevTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta); + STableMeta* pPrevTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pPrevInfo->pTableMeta); SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList; pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList); @@ -2041,6 +2052,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex); } + T_REF_INC(pNew->pTscObj); + uint64_t p = (uint64_t) pNew; pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10); return pNew; @@ -2154,6 +2167,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } + int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) { const char* msgFormat1 = "invalid SQL: %s"; const char* msgFormat2 = "invalid SQL: \'%s\' (%s)"; diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 7fda057483c2ad700083700b5f4b523089610114..4d737ebe66f1e82614d4eb71c1d93129949c5c4e 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -71,7 +71,7 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const * @param pCacheObj Cache object * @param pNode Cache slot object */ -static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode); +static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode); /** * remove nodes in trash with refCount == 0 in cache @@ -80,7 +80,7 @@ static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode); * @param force force model, if true, remove data in trash without check refcount. * may cause corruption. So, forece model only applys before cache is closed */ -static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force); +static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force); /** * release node @@ -222,7 +222,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v taosTFree(p); } else { - taosAddToTrash(pCacheObj, p); + taosAddToTrashcan(pCacheObj, p); uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data); } } @@ -322,11 +322,11 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) { } void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { - if (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0) { + if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0) { return; } - if (pCacheObj == NULL || (*data) == NULL) { + if ((*data) == NULL) { uError("cache:%s, NULL data to release", pCacheObj->name); return; } @@ -399,19 +399,19 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { "others already", pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data); assert(p->pTNodeHeader == NULL); - taosAddToTrash(pCacheObj, p); + taosAddToTrashcan(pCacheObj, p); } else { uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref); if (ref > 0) { assert(pNode->pTNodeHeader == NULL); - taosAddToTrash(pCacheObj, pNode); + taosAddToTrashcan(pCacheObj, pNode); } else { // ref == 0 atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size); int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); - uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes", + uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes", pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize); if (pCacheObj->freeFp) { @@ -432,6 +432,26 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { char* key = pNode->key; char* p = pNode->data; +// int32_t ref = T_REF_VAL_GET(pNode); +// +// if (ref == 1 && inTrashcan) { +// // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be +// // destroyed by refresh worker if decrease ref count before removing it from linked-list. +// assert(pNode->pTNodeHeader->pData == pNode); +// +// __cache_wr_lock(pCacheObj); +// doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader); +// __cache_unlock(pCacheObj); +// +// ref = T_REF_DEC(pNode); +// assert(ref == 0); +// +// doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader); +// } else { +// ref = T_REF_DEC(pNode); +// assert(ref >= 0); +// } + int32_t ref = T_REF_DEC(pNode); uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan); } @@ -452,7 +472,7 @@ static bool travHashTableEmptyFn(void* param, void* data) { if (T_REF_VAL_GET(pNode) == 0) { taosCacheReleaseNode(pCacheObj, pNode); } else { // do add to trashcan - taosAddToTrash(pCacheObj, pNode); + taosAddToTrashcan(pCacheObj, pNode); } // this node should be remove from hash table @@ -463,7 +483,7 @@ void taosCacheEmpty(SCacheObj *pCacheObj) { SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); - taosTrashCanEmpty(pCacheObj, false); + taosTrashcanEmpty(pCacheObj, false); } void taosCacheCleanup(SCacheObj *pCacheObj) { @@ -503,7 +523,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char * return pNewNode; } -void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { +void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) { if (pNode->inTrashcan) { /* node is already in trash */ assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode); return; @@ -525,11 +545,11 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { pCacheObj->numOfElemsInTrash++; __cache_unlock(pCacheObj); - uDebug("cache:%s key:%p, %p move to trash, numOfElem in trash:%d", pCacheObj->name, pNode->key, pNode->data, + uDebug("cache:%s key:%p, %p move to trashcan, numOfElem in trashcan:%d", pCacheObj->name, pNode->key, pNode->data, pCacheObj->numOfElemsInTrash); } -void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { +void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { __cache_wr_lock(pCacheObj); if (pCacheObj->numOfElemsInTrash == 0) { @@ -573,7 +593,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { // todo memory leak if there are object with refcount greater than 0 in hash table? taosHashCleanup(pCacheObj->pHashTable); - taosTrashCanEmpty(pCacheObj, true); + taosTrashcanEmpty(pCacheObj, true); __cache_lock_destroy(pCacheObj); @@ -648,7 +668,7 @@ void* taosCacheTimedRefresh(void *handle) { doCacheRefresh(pCacheObj, now, NULL); } - taosTrashCanEmpty(pCacheObj, false); + taosTrashcanEmpty(pCacheObj, false); } return NULL;