diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index c482375bff19e007fe2ece0eacc7ae8370696019..f5dfdaf779bd24a702ec859e8eb6f3ccfad8a20c 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -333,7 +333,7 @@ typedef struct STscObj { char superAuth : 1; uint32_t connId; uint64_t rid; // ref ID returned by taosAddRef - struct SSqlObj * pHb; + int64_t hbrid; struct SSqlObj * sqlList; struct SSqlStream *streamList; void* pDnodeConn; @@ -373,7 +373,7 @@ typedef struct SSqlObj { struct SSqlObj **pSubs; struct SSqlObj *prev, *next; - struct SSqlObj **self; + int64_t self; } SSqlObj; typedef struct SSqlStream { @@ -507,7 +507,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField } extern SCacheObj* tscMetaCache; -extern SCacheObj* tscObjCache; +extern int tscObjRef; extern void * tscTmr; extern void * tscQhandle; extern int tscKeepConn[]; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 538e652f3c6577098363565a4e76fa637b60709c..4c28adc261f6b83b3c1bbb68d3dd3d56e6007fc9 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -825,8 +825,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; - if (pObj->pHb != NULL) { - if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); + if (pHb != NULL) { + int32_t code = pHb->res.code; + taosReleaseRef(tscObjRef, pObj->hbrid); + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; return pSql->res.code; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9735e26ece0ee107fe7e9464d050d7fb6c665dbe..5d3b86caf33295292944f5eded6fd2cc94d42ba9 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code)); + tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code)); } - if (pObj->pHb != NULL) { + if (pObj->hbrid != 0) { int32_t waitingDuring = tsShellActivityTimer * 500; tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); @@ -193,20 +193,12 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { STscObj *pObj = taosAcquireRef(tscRefId, rid); if (pObj == NULL) return; - SSqlObj* pHB = pObj->pHb; - - void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - tscWarn("%p HB object has been released already", pHB); - taosReleaseRef(tscRefId, pObj->rid); - return; - } - - assert(*pHB->self == pHB); + SSqlObj* pHB = taosAcquireRef(tscObjRef, pObj->hbrid); + assert(pHB->self == pObj->hbrid); pHB->retry = 0; int32_t code = tscProcessSql(pHB); - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pObj->hbrid); if (code != TSDB_CODE_SUCCESS) { tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); @@ -236,7 +228,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .msgType = pSql->cmd.msgType, .pCont = pMsg, .contLen = pSql->cmd.payloadLen, - .ahandle = pSql, + .ahandle = (void*)pSql->self, .handle = NULL, .code = 0 }; @@ -247,26 +239,24 @@ int tscSendMsgToServer(SSqlObj *pSql) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; - void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { + SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); + if (pSql == NULL) { rpcFreeCont(rpcMsg->pCont); return; } - - SSqlObj* pSql = *p; - assert(pSql != NULL); + assert(pSql->self == handle); STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - assert(*pSql->self == pSql); pSql->rpcRid = -1; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); - taosCacheRelease(tscObjCache, (void**) &p, true); + taosRemoveRef(tscObjRef, pSql->self); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -276,10 +266,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature); - void** p1 = p; - taosCacheRelease(tscObjCache, (void**) &p1, false); - - taosCacheRelease(tscObjCache, (void**) &p, true); + taosRemoveRef(tscObjRef, pSql->self); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -322,7 +310,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // if there is an error occurring, proceed to the following error handling procedure. if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -390,11 +378,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { (*pSql->fp)(pSql->param, pSql, rpcMsg->code); } - void** p1 = p; - taosCacheRelease(tscObjCache, (void**) &p1, false); + taosReleaseRef(tscObjRef, pSql->self); if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it - taosCacheRelease(tscObjCache, (void **)&p, true); + taosRemoveRef(tscObjRef, pSql->self); tscDebug("%p sqlObj is automatically freed", pSql); } @@ -2020,7 +2007,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { // TODO multithread problem static void createHBObj(STscObj* pObj) { - if (pObj->pHb != NULL) { + if (pObj->hbrid != 0) { return; } @@ -2052,7 +2039,7 @@ static void createHBObj(STscObj* pObj) { registerSqlObj(pSql); tscDebug("%p HB is allocated, pObj:%p", pSql, pObj); - pObj->pHb = pSql; + pObj->hbrid = pSql->self; } int tscProcessConnectRsp(SSqlObj *pSql) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index de0177647234d0b63fa21a3b0fe2ec5763f40da3..dcca63ff0ebc2b6939c1bfa074ddfad15e5f6560 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -276,8 +276,8 @@ void taos_close(TAOS *taos) { pObj->signature = NULL; taosTmrStopA(&(pObj->pTimer)); - SSqlObj* pHb = pObj->pHb; - if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { + SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); + if (pHb != NULL) { if (pHb->rpcRid > 0) { // wait for rsp from dnode rpcCancelRequest(pHb->rpcRid); pHb->rpcRid = -1; @@ -285,6 +285,7 @@ void taos_close(TAOS *taos) { tscDebug("%p HB is freed", pHb); taos_free_result(pHb); + taosReleaseRef(tscObjRef, pHb->self); } int32_t ref = T_REF_DEC(pObj); @@ -606,8 +607,7 @@ void taos_free_result(TAOS_RES *res) { bool freeNow = tscKillQueryInDnode(pSql); if (freeNow) { tscDebug("%p free sqlObj in cache", pSql); - SSqlObj** p = pSql->self; - taosCacheRelease(tscObjCache, (void**) &p, true); + taosReleaseRef(tscObjRef, pSql->self); } } @@ -700,13 +700,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { continue; } - void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - continue; - } - - SSqlObj* pSubObj = (SSqlObj*) (*p); - assert(pSubObj->self == (SSqlObj**) p); + SSqlObj* pSubObj = pSub; pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; if (pSubObj->rpcRid > 0) { @@ -715,7 +709,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { } tscQueueAsyncRes(pSubObj); - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pSubObj->self); } tscDebug("%p super table query cancelled", pSql); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index a782b53e75513a14b4fe3c297adb077e769ceecd..043e78a9f3c85fa57f776c2212415e12565e3685 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -179,8 +179,8 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* fail: tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code)); if (pSql != NULL) { - if (pSql->self != NULL) { - taos_free_result(pSql); + if (pSql->self != 0) { + taosReleaseRef(tscObjRef, pSql->self); } else { tscFreeSqlObj(pSql); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 6ebbeeef411f8922662e7045027cf55410cbf89d..819a323db5e947df46158b9665c0f5d5c491f532 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2198,6 +2198,9 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); + // free the data block created from insert sql string + pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); return code; // here the pSql may have been released already. diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 9e9a00550a4497ae11cbdffe48991c2fd7b742d6..91eabda78b8082355e9b48b0cf52c5495304710b 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -15,7 +15,7 @@ #include "os.h" #include "taosmsg.h" -#include "tcache.h" +#include "tref.h" #include "trpc.h" #include "tsystem.h" #include "ttimer.h" @@ -31,7 +31,7 @@ // global, not configurable SCacheObj* tscMetaCache; -SCacheObj* tscObjCache; +int tscObjRef = -1; void * tscTmr; void * tscQhandle; void * tscCheckDiskUsageTmr; @@ -144,7 +144,7 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); - tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); + tscObjRef = taosOpenRef(4096, tscFreeRegisteredSqlObj); } tscRefId = taosOpenRef(200, tscCloseTscObj); @@ -167,9 +167,9 @@ void taos_cleanup(void) { taosCacheCleanup(m); } - m = tscObjCache; - if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) { - taosCacheCleanup(m); + int refId = atomic_exchange_32(&tscObjRef, -1); + if (refId != -1) { + taosCloseRef(refId); } m = tscQhandle; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d22c2588cca37d03b10c506856426068fcf79a78..af17f9b45729ea244a36b76218a0f1aed6de7db0 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -447,20 +447,18 @@ static void tscFreeSubobj(SSqlObj* pSql) { void tscFreeRegisteredSqlObj(void *pSql) { assert(pSql != NULL); - SSqlObj** p = (SSqlObj**)pSql; - STscObj* pTscObj = (*p)->pTscObj; + SSqlObj* p = *(SSqlObj**)pSql; + STscObj* pTscObj = p->pTscObj; - assert((*p)->self != 0 && (*p)->self == (p)); - - SSqlObj* ptr = *p; - tscFreeSqlObj(*p); + assert(p->self != 0); + tscFreeSqlObj(p); int32_t ref = T_REF_DEC(pTscObj); assert(ref >= 0); - tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", ptr, pTscObj, ref); + tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref); if (ref == 0) { - tscDebug("%p all sqlObj freed, free tscObj:%p", ptr, pTscObj); + tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj); taosRemoveRef(tscRefId, pTscObj->rid); } } @@ -840,7 +838,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { // the length does not include the SSubmitBlk structure pBlocks->dataLen = htonl(finalLen); - dataBuf->numOfTables += 1; } @@ -1565,19 +1562,6 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) { } } -void tscSetFreeHeatBeat(STscObj* pObj) { - if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) { - return; - } - - SSqlObj* pHeatBeat = pObj->pHb; - assert(pHeatBeat == pHeatBeat->signature); - - // to denote the heart-beat timer close connection and free all allocated resources - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0); - pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; -} - /* * the following four kinds of SqlObj should not be freed * 1. SqlObj for stream computing @@ -1596,7 +1580,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) { } STscObj* pTscObj = pSql->pTscObj; - if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) { + if (pSql->pStream != NULL || pTscObj->hbrid == pSql->self || pSql->pSubscription != NULL) { return false; } @@ -1888,13 +1872,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { } void registerSqlObj(SSqlObj* pSql) { - int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec - int32_t ref = T_REF_INC(pSql->pTscObj); tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref); - TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql; - pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME); + pSql->self = taosAddRef(tscObjRef, pSql); } SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {