diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a3d3b035e2c4564acd34a71fe1c1490ddc25ec75..e85ade60e509373dd52a37e8774a6bca1c7b9d56 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1675,8 +1675,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); assert(pTableMetaInfo->pTableMeta == NULL); - pTableMetaInfo->pTableMeta = - (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsTableMetaKeepTimer); + pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, + strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer); // todo handle out of memory case if (pTableMetaInfo->pTableMeta == NULL) { @@ -1879,7 +1879,8 @@ int tscProcessShowRsp(SSqlObj *pSql) { size_t size = 0; STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); - pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsTableMetaKeepTimer); + pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size, + tsTableMetaKeepTimer); SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); if (pQueryInfo->colList == NULL) { @@ -1949,9 +1950,8 @@ int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) { int tscProcessDropTableRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name); - if (pTableMeta == NULL) { - /* not in cache, abort */ + STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); + if (pTableMeta == NULL) { /* not in cache, abort */ return 0; } @@ -1975,7 +1975,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name); + STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); if (pTableMeta == NULL) { /* not in cache, abort */ return 0; } @@ -2125,7 +2125,7 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false); } - pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name); + pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); if (pTableMetaInfo->pTableMeta != NULL) { STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns, diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 201ace43de171025b902d30d043cea04b755c02a..f1d69fa261ced440841ea4c4a574c5ace3a00594 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -148,7 +148,7 @@ void taos_init_imp() { refreshTime = refreshTime < 10 ? 10 : refreshTime; if (tscCacheHandle == NULL) { - tscCacheHandle = taosCacheInit(refreshTime); + tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL); } tscDebug("client is initialized successfully"); diff --git a/src/inc/query.h b/src/inc/query.h index 5fd2ede034ebfaaf86eecce7a429c33996606027..af3a89682c8c931b4e3aa5db250bb5e082759bdd 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -20,6 +20,7 @@ extern "C" { #endif typedef void* qinfo_t; +typedef void (*_qinfo_free_fn_t)(void*); /** * create the qinfo object according to QueryTableMsg @@ -28,15 +29,13 @@ typedef void* qinfo_t; * @param qinfo * @return */ -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo); +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, _qinfo_free_fn_t fn, qinfo_t* qinfo); /** * Destroy QInfo object * @param qinfo qhandle - * @param fp destroy callback function, while the qhandle is destoried, invoke the fp - * @param param free callback params */ -void qDestroyQueryInfo(qinfo_t qinfo, void (*fp)(void*), void* param); +void qDestroyQueryInfo(qinfo_t qinfo); /** * the main query execution function, including query on both table and multitables, @@ -81,11 +80,9 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo); /** * kill current ongoing query and free query handle automatically * @param qinfo qhandle - * @param fp destroy callback function, while the qhandle is destoried, invoke the fp - * @param param free callback params * @return */ -int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param); +int32_t qKillQuery(qinfo_t qinfo); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index f3d6a3d344c84cdd03b43300ed0e3e1d416ec50d..a1d4be93c66898cb883850ceff87cac120f143ef 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -68,7 +68,7 @@ int32_t mnodeInitProfile() { mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); - tsMnodeConnCache = taosCacheInitWithCb(CONN_CHECK_TIME, mnodeFreeConn); + tsMnodeConnCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, CONN_CHECK_TIME,false, mnodeFreeConn); return 0; } @@ -101,8 +101,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { tstrncpy(connObj.user, user, sizeof(connObj.user)); char key[10]; - sprintf(key, "%u", connId); - SConnObj *pConn = taosCachePut(tsMnodeConnCache, key, &connObj, sizeof(connObj), CONN_KEEP_TIME); + int32_t len = sprintf(key, "%u", connId); + SConnObj *pConn = taosCachePut(tsMnodeConnCache, key, len, &connObj, sizeof(connObj), CONN_KEEP_TIME); mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); return pConn; @@ -115,10 +115,10 @@ void mnodeReleaseConn(SConnObj *pConn) { SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { char key[10]; - sprintf(key, "%u", connId); + int32_t len = sprintf(key, "%u", connId); uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); - SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, expireTime); + SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, len, expireTime); if (pConn == NULL) { mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); return NULL; @@ -547,7 +547,7 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); - SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); + SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, connIdStr, strlen(connIdStr)); if (pConn == NULL) { mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId); return TSDB_CODE_MND_INVALID_CONN_ID; @@ -577,7 +577,7 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10); - SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); + SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, connIdStr, strlen(connIdStr)); if (pConn == NULL) { mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId); return TSDB_CODE_MND_INVALID_CONN_ID; @@ -594,7 +594,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS; SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont; - SConnObj * pConn = taosCacheAcquireByName(tsMnodeConnCache, pKill->queryId); + SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, pKill->queryId, strlen(pKill->queryId)); if (pConn == NULL) { mError("connId:%s, failed to kill, conn not exist", pKill->queryId); return TSDB_CODE_MND_INVALID_CONN_ID; diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 06ef2cb452fb063f642b12c6b3c7c5cf44d1b673..12b434a5131fc5a3188e383f4e25b62262f4e5d1 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -65,7 +65,7 @@ int32_t mnodeInitShow() { mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); - tsMnodeShowCache = taosCacheInitWithCb(5, mnodeFreeShowObj); + tsMnodeShowCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, mnodeFreeShowObj); return 0; } @@ -365,9 +365,9 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) { static bool mnodeAccquireShowObj(SShowObj *pShow) { char key[10]; - sprintf(key, "%d", pShow->index); + int32_t len = sprintf(key, "%d", pShow->index); - SShowObj *pSaved = taosCacheAcquireByName(tsMnodeShowCache, key); + SShowObj *pSaved = taosCacheAcquireByKey(tsMnodeShowCache, key, len); if (pSaved == pShow) { mDebug("%p, show is accquired from cache", pShow); return true; @@ -380,9 +380,9 @@ static void *mnodePutShowObj(SShowObj *pShow, int32_t size) { if (tsMnodeShowCache != NULL) { char key[10]; pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1); - sprintf(key, "%d", pShow->index); + int32_t len = sprintf(key, "%d", pShow->index); - SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 6); + SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, len, pShow, size, 6); free(pShow); mDebug("%p, show is put into cache, index:%s", newQhandle, key); diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index b09f34b562294e13f9ee7e9e728f1ad974f8c9b1..46e7fd45fac42418650ce0dfdb0b0abb8316b1ed 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) { } bool httpInitContexts() { - tsHttpServer.contextCache = taosCacheInitWithCb(2, httpDestroyContext); + tsHttpServer.contextCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 2, false, httpDestroyContext); if (tsHttpServer.contextCache == NULL) { httpError("failed to init context cache"); return false; @@ -104,14 +104,14 @@ HttpContext *httpCreateContext(int32_t fd) { if (pContext == NULL) return NULL; char contextStr[16] = {0}; - snprintf(contextStr, sizeof(contextStr), "%p", pContext); + int32_t keySize = snprintf(contextStr, sizeof(contextStr), "%p", pContext); pContext->fd = fd; pContext->httpVersion = HTTP_VERSION_10; pContext->lastAccessTime = taosGetTimestampSec(); pContext->state = HTTP_CONTEXT_STATE_READY; - HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, &pContext, sizeof(HttpContext *), 3); + HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, keySize, &pContext, sizeof(HttpContext *), 3); pContext->ppContext = ppContext; httpDebug("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext); @@ -123,9 +123,9 @@ HttpContext *httpCreateContext(int32_t fd) { HttpContext *httpGetContext(void *ptr) { char contextStr[16] = {0}; - snprintf(contextStr, sizeof(contextStr), "%p", ptr); + int32_t len = snprintf(contextStr, sizeof(contextStr), "%p", ptr); - HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, contextStr); + HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, contextStr, len); if (ppContext) { HttpContext *pContext = *ppContext; diff --git a/src/plugins/http/src/httpSession.c b/src/plugins/http/src/httpSession.c index 83602e1291646d2f574b2628569eb5a95fa8e2de..3a901167d522adb725537f8ed5834bd68e1b6cb4 100644 --- a/src/plugins/http/src/httpSession.c +++ b/src/plugins/http/src/httpSession.c @@ -33,9 +33,9 @@ void httpCreateSession(HttpContext *pContext, void *taos) { memset(&session, 0, sizeof(HttpSession)); session.taos = taos; session.refCount = 1; - snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); + int32_t len = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); - pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire); + pContext->session = taosCachePut(server->sessionCache, session.id, len, &session, sizeof(HttpSession), tsHttpSessionExpire); // void *temp = pContext->session; // taosCacheRelease(server->sessionCache, (void **)&temp, false); @@ -57,9 +57,9 @@ static void httpFetchSessionImp(HttpContext *pContext) { pthread_mutex_lock(&server->serverMutex); char sessionId[HTTP_SESSION_ID_LEN]; - snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); + int32_t len = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); - pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId); + pContext->session = taosCacheAcquireByKey(server->sessionCache, sessionId, len); if (pContext->session != NULL) { atomic_add_fetch_32(&pContext->session->refCount, 1); httpDebug("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd, @@ -115,7 +115,7 @@ void httpCleanUpSessions() { } bool httpInitSessions() { - tsHttpServer.sessionCache = taosCacheInitWithCb(5, httpDestroySession); + tsHttpServer.sessionCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession); if (tsHttpServer.sessionCache == NULL) { httpError("failed to init session cache"); return false; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a83072384595d96fc60cc57ff35b1502c4b83b49..d7d59230b65bff91d28f42e732a54acb176019f7 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -27,6 +27,7 @@ #include "tref.h" #include "tsdb.h" #include "tsqlfunction.h" +#include "query.h" struct SColumnFilterElem; typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); @@ -181,13 +182,13 @@ typedef struct SQueryRuntimeEnv { } SQueryRuntimeEnv; typedef struct SQInfo { - void* signature; - int32_t pointsInterpo; - int32_t code; // error code to returned to client - sem_t dataReady; - void* tsdb; - int32_t vgId; - + void* signature; + int32_t pointsInterpo; + int32_t code; // error code to returned to client + sem_t dataReady; + void* tsdb; + void* param; + int32_t vgId; STableGroupInfo tableGroupInfo; // table id list < only includes the STable list> STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure SQueryRuntimeEnv runtimeEnv; @@ -202,8 +203,9 @@ typedef struct SQInfo { * We later may refactor to remove this attribution by using another flag to denote * whether a multimeter query is completed or not. */ - int32_t tableIndex; - int32_t numOfGroupResultPages; + int32_t tableIndex; + int32_t numOfGroupResultPages; + _qinfo_free_fn_t fn; } SQInfo; #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5a17e4e1e842b785538546ab0ee149f9b6e5dcc6..f33d739ba1f41e22c90ee33f6cc6133febbe0bee 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4069,7 +4069,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { return pFillCol; } -int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) { +int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery, void* freeParam, _qinfo_free_fn_t fn) { int32_t code = TSDB_CODE_SUCCESS; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; @@ -4083,14 +4083,16 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool pQInfo->tsdb = tsdb; pQInfo->vgId = vgId; + pQInfo->param = freeParam; + pQInfo->fn = fn; pRuntimeEnv->pQuery = pQuery; - pRuntimeEnv->pTSBuf = param; + pRuntimeEnv->pTSBuf = pTsBuf; pRuntimeEnv->cur.vgroupIndex = -1; pRuntimeEnv->stableQuery = isSTableQuery; pRuntimeEnv->prevGroupId = INT32_MIN; - if (param != NULL) { + if (pTsBuf != NULL) { int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); } @@ -5697,8 +5699,7 @@ static bool isValidQInfo(void *param) { return (sig == (uint64_t)pQInfo); } - -static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable) { +static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param, _qinfo_free_fn_t fn) { int32_t code = TSDB_CODE_SUCCESS; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -5731,7 +5732,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ } // filter the qualified - if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) { + if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable, param, fn)) != TSDB_CODE_SUCCESS) { goto _error; } @@ -5894,7 +5895,8 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { return TSDB_CODE_SUCCESS; } -int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn, + qinfo_t* pQInfo) { assert(pQueryMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -5984,7 +5986,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi goto _over; } - code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery); + code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param, fn); _over: free(tagCond); @@ -6020,7 +6022,7 @@ static void doDestoryQueryInfo(SQInfo* pQInfo) { freeQInfo(pQInfo); } -void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) { +void qDestroyQueryInfo(qinfo_t qHandle) { SQInfo* pQInfo = (SQInfo*) qHandle; if (!isValidQInfo(pQInfo)) { return; @@ -6030,11 +6032,15 @@ void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) { qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref); if (ref == 0) { - doDestoryQueryInfo(pQInfo); + _qinfo_free_fn_t fn = pQInfo->fn; + void* param = pQInfo->param; - if (fp != NULL) { - fp(param); + doDestoryQueryInfo(pQInfo); + if (fn != NULL) { + assert(param != NULL); + fn(param); } + } } @@ -6048,7 +6054,7 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); - qDestroyQueryInfo(pQInfo, fp, param); + qDestroyQueryInfo(pQInfo); return; } @@ -6069,7 +6075,7 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { } sem_post(&pQInfo->dataReady); - qDestroyQueryInfo(pQInfo, fp, param); + qDestroyQueryInfo(pQInfo); } int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { @@ -6162,7 +6168,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co return code; } -int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { +int32_t qKillQuery(qinfo_t qinfo) { SQInfo *pQInfo = (SQInfo *)qinfo; if (pQInfo == NULL || !isValidQInfo(pQInfo)) { @@ -6170,8 +6176,7 @@ int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { } setQueryKilled(pQInfo); - qDestroyQueryInfo(pQInfo, fp, param); - + qDestroyQueryInfo(pQInfo); return TSDB_CODE_SUCCESS; } diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index 17b38238316bf74d67824b119b774a8cd5804860..cd3d0d436f68b25fdc9fa71175acaaea5b5f408c 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -24,6 +24,8 @@ extern "C" { #include "tref.h" #include "hash.h" +typedef void (*__cache_freeres_fn_t)(void*); + typedef struct SCacheStatis { int64_t missCount; int64_t hitCount; @@ -34,14 +36,15 @@ typedef struct SCacheStatis { typedef struct SCacheDataNode { uint64_t addedTime; // the added time when this element is added or updated into cache - uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache + uint64_t lifespan; // expiredTime expiredTime when this element should be remove from cache uint64_t signature; uint32_t size; // allocated size for current SCacheDataNode - uint16_t keySize: 15; - bool inTrashCan: 1;// denote if it is in trash or not T_REF_DECLARE() - char *key; - char data[]; + uint16_t keySize: 15; // max key size: 32kb + bool inTrashCan: 1;// denote if it is in trash or not + int32_t extendFactor; // number of life span extend + char *key; + char data[]; } SCacheDataNode; typedef struct STrashElem { @@ -62,29 +65,32 @@ typedef struct { int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. int64_t refreshTime; STrashElem * pTrash; - void * tmrCtrl; - void * pTimer; +// void * tmrCtrl; +// void * pTimer; SCacheStatis statistics; SHashObj * pHashTable; - _hash_free_fn_t freeFp; + __cache_freeres_fn_t freeFp; uint32_t numOfElemsInTrash; // number of element in trash uint8_t deleting; // set the deleting flag to stop refreshing ASAP. pthread_t refreshWorker; - + bool extendLifespan; // auto extend life span when one item is accessed. #if defined(LINUX) pthread_rwlock_t lock; #else - pthread_mutex_t lock; + pthread_mutex_t lock; #endif } SCacheObj; /** * initialize the cache object - * @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and - * not referenced by other objects + * @param keyType key type + * @param refreshTimeInSeconds refresh operation interval time, the maximum survival time when one element is expired + * and not referenced by other objects + * @param extendLifespan auto extend lifespan, if accessed + * @param fn free resource callback function * @return */ -SCacheObj *taosCacheInit(int64_t refreshTimeInSeconds); +SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn); /** * initialize the cache object and set the free object callback function @@ -92,7 +98,7 @@ SCacheObj *taosCacheInit(int64_t refreshTimeInSeconds); * @param freeCb * @return */ -SCacheObj *taosCacheInitWithCb(int64_t refreshTimeInSeconds, void (*freeCb)(void *data)); +SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn); /** * add data into cache @@ -104,7 +110,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTimeInSeconds, void (*freeCb)(void * @param keepTime survival time in second * @return cached element */ -void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int keepTimeInSeconds); +void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int keepTimeInSeconds); /** * get data from cache @@ -112,22 +118,23 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz * @param key key * @return cached data or NULL */ -void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key); +void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen); /** * update the expire time of data in cache * @param pCacheObj cache object * @param key key + * @param keyLen keyLen * @param expireTime new expire time of data * @return */ -void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime); +void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, size_t keyLen, uint64_t expireTime); /** * Add one reference count for the exist data, and assign this data for a new owner. * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore. - * This procedure is a faster version of taosCacheAcquireByName function, which avoids the sideeffect of the problem of - * the data is moved to trash, and taosCacheAcquireByName will fail to retrieve it again. + * This procedure is a faster version of taosCacheAcquireByKey function, which avoids the sideeffect of the problem of + * the data is moved to trash, and taosCacheAcquireByKey will fail to retrieve it again. * * @param handle * @param data @@ -148,8 +155,7 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data); * if it is referenced by other object, it will be remain in cache * @param handle cache object * @param data not the key, actually referenced data - * @param _remove force model, reduce the ref count and move the data into - * pTrash + * @param _remove force model, reduce the ref count and move the data into pTrash */ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove); diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 720741f0893086d4c9c3fb59bee0adfe4dc3f534..d763472a12dbff7ee7db6a2fc6b6a887e63a7465 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -119,7 +119,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo int32_t size = pNode->size; taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); - uDebug("key:%s, is removed from cache, total:%" PRId64 " size:%d bytes", pNode->key, pCacheObj->totalSize, size); + uDebug("key:%p, is removed from cache,total:%" PRId64 ",size:%dbytes", pNode->key, pCacheObj->totalSize, size); if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); free(pNode); } @@ -167,7 +167,7 @@ static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode // update the timestamp information for updated key/value pNewNode->addedTime = taosGetTimestampMs(); - pNewNode->expiredTime = pNewNode->addedTime + duration; + pNewNode->lifespan = duration; T_REF_INC(pNewNode); @@ -224,8 +224,8 @@ static void doCleanupDataCache(SCacheObj *pCacheObj); */ static void* taosCacheRefresh(void *handle); -SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) { - if (refreshTime <= 0) { +SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn) { + if (refreshTimeInSeconds <= 0) { return NULL; } @@ -235,7 +235,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) return NULL; } - pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false); + pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false); if (pCacheObj->pHashTable == NULL) { free(pCacheObj); uError("failed to allocate memory, reason:%s", strerror(errno)); @@ -243,10 +243,9 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) } // set free cache node callback function for hash table - // taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode); - - pCacheObj->freeFp = freeCb; - pCacheObj->refreshTime = refreshTime * 1000; + pCacheObj->freeFp = fn; + pCacheObj->refreshTime = refreshTimeInSeconds * 1000; + pCacheObj->extendLifespan = extendLifespan; if (__cache_lock_init(pCacheObj) != 0) { taosHashCleanup(pCacheObj->pHashTable); @@ -256,7 +255,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) return NULL; } - pthread_attr_t thattr; + pthread_attr_t thattr = {{0}}; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); @@ -266,19 +265,17 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) return pCacheObj; } -SCacheObj *taosCacheInit(int64_t refreshTime) { - return taosCacheInitWithCb(refreshTime, NULL); +SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn) { + return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn); } -void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int duration) { +void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) { SCacheDataNode *pNode; if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) { return NULL; } - - size_t keyLen = strlen(key); - + __cache_wr_lock(pCacheObj); SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL; @@ -288,14 +285,14 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz if (NULL != pNode) { pCacheObj->totalSize += pNode->size; - uDebug("key:%s, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes", - key, pNode, pNode->addedTime, pNode->expiredTime, pCacheObj->totalSize, dataSize); + uDebug("key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes", + key, pNode, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime), pCacheObj->totalSize, dataSize); } else { - uError("key:%s, failed to added into cache, out of memory", key); + uError("key:%p, failed to added into cache, out of memory", key); } } else { // old data exists, update the node pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L); - uDebug("key:%s, %p exist in cache, updated", key, pNode); + uDebug("key:%p, %p exist in cache, updated", key, pNode); } __cache_unlock(pCacheObj); @@ -303,57 +300,65 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz return (pNode != NULL) ? pNode->data : NULL; } -void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) { +void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { return NULL; } - - uint32_t keyLen = (uint32_t)strlen(key); - + __cache_rd_lock(pCacheObj); SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); + + int32_t ref = 0; if (ptNode != NULL) { - T_REF_INC(*ptNode); + ref = T_REF_INC(*ptNode); + + // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan + if (pCacheObj->extendLifespan) { + int64_t now = taosGetTimestampMs(); + + if ((now - (*ptNode)->addedTime) < (*ptNode)->lifespan * (*ptNode)->extendFactor) { + (*ptNode)->extendFactor += 1; + uDebug("key:%p extend life time to %"PRId64, key, (*ptNode)->lifespan * (*ptNode)->extendFactor + (*ptNode)->addedTime); + } + } } - __cache_unlock(pCacheObj); if (ptNode != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("key:%s, is retrieved from cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); + uDebug("key:%p, is retrieved from cache, %p refcnt:%d", key, (*ptNode), ref); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); - uDebug("key:%s, not in cache, retrieved failed", key); + uDebug("key:%p, not in cache, retrieved failed", key); } atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); return (ptNode != NULL) ? (*ptNode)->data : NULL; } -void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime) { +void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, size_t keyLen, uint64_t expireTime) { if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { return NULL; } - - uint32_t keyLen = (uint32_t)strlen(key); - + __cache_rd_lock(pCacheObj); SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); if (ptNode != NULL) { T_REF_INC(*ptNode); - (*ptNode)->expiredTime = expireTime; + (*ptNode)->extendFactor += 1; +// (*ptNode)->lifespan = expireTime; } __cache_unlock(pCacheObj); if (ptNode != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("key:%s, expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); + uDebug("key:%p, expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); - uDebug("key:%s, not in cache, retrieved failed", key); + uDebug("key:%p, not in cache, retrieved failed", key); } atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); @@ -373,7 +378,17 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { int32_t ref = T_REF_INC(ptNode); uDebug("%p acquired by data in cache, refcnt:%d", ptNode, ref) - + + // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan + if (pCacheObj->extendLifespan) { + int64_t now = taosGetTimestampMs(); + + if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) { + ptNode->extendFactor += 1; + uDebug("key:%p extend life time to %"PRId64, ptNode, ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime); + } + } + // the data if referenced by at least one object, so the reference count must be greater than the value of 2. assert(ref >= 2); return data; @@ -408,21 +423,27 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { size_t offset = offsetof(SCacheDataNode, data); SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset); - if (pNode->signature != (uint64_t)pNode) { - uError("%p release invalid cache data", pNode); + uError("key:%p, release invalid cache data", pNode); return; } *data = NULL; - int32_t ref = T_REF_DEC(pNode); - uDebug("key:%s, is released, %p refcnt:%d", pNode->key, pNode, ref); + int16_t ref = T_REF_DEC(pNode); + uDebug("%p data released, refcnt:%d", pNode, ref); - if (_remove) { + if (_remove && (!pNode->inTrashCan)) { __cache_wr_lock(pCacheObj); - // pNode may be released immediately by other thread after the reference count of pNode is set to 0, - // So we need to lock it in the first place. - taosCacheMoveToTrash(pCacheObj, pNode); + + if (T_REF_VAL_GET(pNode) == 0) { + // remove directly, if not referenced by other users + taosCacheReleaseNode(pCacheObj, pNode); + } else { + // pNode may be released immediately by other thread after the reference count of pNode is set to 0, + // So we need to lock it in the first place. + taosCacheMoveToTrash(pCacheObj, pNode); + } + __cache_unlock(pCacheObj); } } @@ -473,11 +494,11 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char * memcpy(pNewNode->key, key, keyLen); - pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); - pNewNode->expiredTime = pNewNode->addedTime + duration; - - pNewNode->signature = (uint64_t)pNewNode; - pNewNode->size = (uint32_t)totalSize; + pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); + pNewNode->lifespan = duration; + pNewNode->extendFactor = 1; + pNewNode->signature = (uint64_t)pNewNode; + pNewNode->size = (uint32_t)totalSize; return pNewNode; } @@ -501,7 +522,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { pNode->inTrashCan = true; pCacheObj->numOfElemsInTrash++; - uDebug("key:%s, %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash); + uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash); } void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { @@ -522,7 +543,11 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { } pElem->pData->signature = 0; - if (pCacheObj->freeFp) pCacheObj->freeFp(pElem->pData->data); + if (pCacheObj->freeFp) { + pCacheObj->freeFp(pElem->pData->data); + } + + uError("-------------------free obj:%p", pElem->pData); free(pElem->pData); free(pElem); } @@ -549,7 +574,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { } if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { - uDebug("key:%s, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, + uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, pCacheObj->numOfElemsInTrash - 1); STrashElem *p = pElem; @@ -573,7 +598,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { if (T_REF_VAL_GET(pNode) <= 0) { taosCacheReleaseNode(pCacheObj, pNode); } else { - uDebug("key:%s, will not remove from cache, refcnt:%d", pNode->key, T_REF_VAL_GET(pNode)); + uDebug("key:%p, will not remove from cache, refcnt:%d", pNode->key, T_REF_VAL_GET(pNode)); } } taosHashDestroyIter(pIter); @@ -613,27 +638,32 @@ void* taosCacheRefresh(void *handle) { // reset the count value count = 0; - size_t num = taosHashGetSize(pCacheObj->pHashTable); - if (num == 0) { + size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable); + if (elemInHash + pCacheObj->numOfElemsInTrash == 0) { continue; } - uint64_t expiredTime = taosGetTimestampMs(); pCacheObj->statistics.refreshCount++; - SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); + // refresh data in hash table + if (elemInHash > 0) { + int64_t expiredTime = taosGetTimestampMs(); - __cache_wr_lock(pCacheObj); - while (taosHashIterNext(pIter)) { - SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); - if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { - taosCacheReleaseNode(pCacheObj, pNode); + SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); + + __cache_wr_lock(pCacheObj); + while (taosHashIterNext(pIter)) { + SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); + if ((pNode->addedTime + pNode->lifespan * pNode->extendFactor) <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { + taosCacheReleaseNode(pCacheObj, pNode); + } } - } - __cache_unlock(pCacheObj); + __cache_unlock(pCacheObj); + + taosHashDestroyIter(pIter); + } - taosHashDestroyIter(pIter); taosTrashCanEmpty(pCacheObj, false); } diff --git a/src/util/tests/cacheTest.cpp b/src/util/tests/cacheTest.cpp index 5762d5700bbb1945490846dcfaaf0e12f0fbaa27..43ac689ff47745cb2884455e39581f19586c7e7e 100644 --- a/src/util/tests/cacheTest.cpp +++ b/src/util/tests/cacheTest.cpp @@ -19,12 +19,12 @@ int32_t tsMaxMeterConnections = 200; // test cache TEST(testCase, client_cache_test) { const int32_t REFRESH_TIME_IN_SEC = 2; - SCacheObj* tscCacheHandle = taosCacheInit(REFRESH_TIME_IN_SEC); + SCacheObj* tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL); const char* key1 = "test1"; char data1[] = "test11"; - char* cachedObj = (char*) taosCachePut(tscCacheHandle, key1, data1, strlen(data1)+1, 1); + char* cachedObj = (char*) taosCachePut(tscCacheHandle, key1, strlen(key1), data1, strlen(data1)+1, 1); sleep(REFRESH_TIME_IN_SEC+1); printf("obj is still valid: %s\n", cachedObj); @@ -33,7 +33,7 @@ TEST(testCase, client_cache_test) { taosCacheRelease(tscCacheHandle, (void**) &cachedObj, false); /* the object is cleared by cache clean operation */ - cachedObj = (char*) taosCachePut(tscCacheHandle, key1, data2, strlen(data2)+1, 20); + cachedObj = (char*) taosCachePut(tscCacheHandle, key1, strlen(key1), data2, strlen(data2)+1, 20); printf("after updated: %s\n", cachedObj); printf("start to remove data from cache\n"); @@ -43,32 +43,32 @@ TEST(testCase, client_cache_test) { const char* key3 = "test2"; const char* data3 = "kkkkkkk"; - char* cachedObj2 = (char*) taosCachePut(tscCacheHandle, key3, data3, strlen(data3) + 1, 1); + char* cachedObj2 = (char*) taosCachePut(tscCacheHandle, key3, strlen(key3), data3, strlen(data3) + 1, 1); printf("%s\n", cachedObj2); taosCacheRelease(tscCacheHandle, (void**) &cachedObj2, false); sleep(3); - char* d = (char*) taosCacheAcquireByName(tscCacheHandle, key3); + char* d = (char*) taosCacheAcquireByKey(tscCacheHandle, key3, strlen(key3)); // assert(d == NULL); char key5[] = "test5"; char data5[] = "data5kkkkk"; - cachedObj2 = (char*) taosCachePut(tscCacheHandle, key5, data5, strlen(data5) + 1, 20); + cachedObj2 = (char*) taosCachePut(tscCacheHandle, key5, strlen(key5), data5, strlen(data5) + 1, 20); const char* data6= "new Data after updated"; taosCacheRelease(tscCacheHandle, (void**) &cachedObj2, false); - cachedObj2 = (char*) taosCachePut(tscCacheHandle, key5, data6, strlen(data6) + 1, 20); + cachedObj2 = (char*) taosCachePut(tscCacheHandle, key5, strlen(key5), data6, strlen(data6) + 1, 20); printf("%s\n", cachedObj2); taosCacheRelease(tscCacheHandle, (void**) &cachedObj2, true); const char* data7 = "add call update procedure"; - cachedObj2 = (char*) taosCachePut(tscCacheHandle, key5, data7, strlen(data7) + 1, 20); + cachedObj2 = (char*) taosCachePut(tscCacheHandle, key5, strlen(key5), data7, strlen(data7) + 1, 20); printf("%s\n=======================================\n\n", cachedObj2); - char* cc = (char*) taosCacheAcquireByName(tscCacheHandle, key5); + char* cc = (char*) taosCacheAcquireByKey(tscCacheHandle, key5, strlen(key5)); taosCacheRelease(tscCacheHandle, (void**) &cachedObj2, true); taosCacheRelease(tscCacheHandle, (void**) &cc, false); @@ -76,7 +76,7 @@ TEST(testCase, client_cache_test) { const char* data8 = "ttft"; const char* key6 = "key6"; - char* ft = (char*) taosCachePut(tscCacheHandle, key6, data8, strlen(data8), 20); + char* ft = (char*) taosCachePut(tscCacheHandle, key6, strlen(key6), data8, strlen(data8), 20); taosCacheRelease(tscCacheHandle, (void**) &ft, false); /** @@ -85,7 +85,7 @@ TEST(testCase, client_cache_test) { uint64_t startTime = taosGetTimestampUs(); printf("Cache Performance Test\nstart time:%" PRIu64 "\n", startTime); for(int32_t i=0; i<1000; ++i) { - char* dd = (char*) taosCacheAcquireByName(tscCacheHandle, key6); + char* dd = (char*) taosCacheAcquireByKey(tscCacheHandle, key6, strlen(key6)); if (dd != NULL) { // printf("get the data\n"); } else { @@ -105,7 +105,7 @@ TEST(testCase, client_cache_test) { TEST(testCase, cache_resize_test) { const int32_t REFRESH_TIME_IN_SEC = 2; - auto* pCache = taosCacheInit(REFRESH_TIME_IN_SEC); + auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL); char key[256] = {0}; char data[1024] = "abcdefghijk"; @@ -116,7 +116,7 @@ TEST(testCase, cache_resize_test) { for(int32_t i = 0; i < num; ++i) { int32_t len = sprintf(key, "abc_%7d", i); - taosCachePut(pCache, key, data, len, 3600); + taosCachePut(pCache, key, strlen(key), data, len, 3600); } uint64_t endTime = taosGetTimestampUs(); @@ -125,7 +125,7 @@ TEST(testCase, cache_resize_test) { startTime = taosGetTimestampUs(); for(int32_t i = 0; i < num; ++i) { int32_t len = sprintf(key, "abc_%7d", i); - void* k = taosCacheAcquireByName(pCache, key); + void* k = taosCacheAcquireByKey(pCache, key, len); assert(k != 0); } endTime = taosGetTimestampUs(); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index e428eae688505f8930e0f981cb02326fea981752..76e53f3962ed55cf1a3aee875d10b83b22ac9c37 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -53,6 +53,7 @@ typedef struct { STsdbCfg tsdbCfg; SSyncCfg syncCfg; SWalCfg walCfg; + void *qHandlePool; // query handle pool char *rootDir; char db[TSDB_DB_NAME_LEN]; } SVnodeObj; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f71f6adefbe3923aa877bdbc0ad3a93ffb5a8cc8..192998c8a6060a58ed91075b3ab9978592a2d2f0 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -15,19 +15,22 @@ #define _DEFAULT_SOURCE #include "os.h" + +#include "tcache.h" +#include "cJSON.h" +#include "dnode.h" #include "hash.h" #include "taoserror.h" #include "taosmsg.h" -#include "tutil.h" +#include "tglobal.h" #include "trpc.h" #include "tsdb.h" #include "ttime.h" #include "ttimer.h" -#include "cJSON.h" -#include "tglobal.h" -#include "dnode.h" +#include "tutil.h" #include "vnode.h" #include "vnodeInt.h" +#include "query.h" #define TSDB_VNODE_VERSION_CONTENT_LEN 31 @@ -43,6 +46,7 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +static void vnodeFreeqHandle(void* phandle); static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; @@ -279,6 +283,9 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); + const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool + pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle); + pVnode->events = NULL; pVnode->status = TAOS_VN_STATUS_READY; vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); @@ -848,12 +855,12 @@ static int32_t vnodeReadVersion(SVnodeObj *pVnode) { goto PARSE_OVER; } - cJSON *version = cJSON_GetObjectItem(root, "version"); - if (!version || version->type != cJSON_Number) { + cJSON *ver = cJSON_GetObjectItem(root, "version"); + if (!ver || ver->type != cJSON_Number) { vError("vgId:%d, failed to read vnode version, version not found", pVnode->vgId); goto PARSE_OVER; } - pVnode->version = version->valueint; + pVnode->version = ver->valueint; terrno = TSDB_CODE_SUCCESS; vInfo("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version); @@ -864,3 +871,12 @@ PARSE_OVER: if(fp) fclose(fp); return terrno; } + +void vnodeFreeqHandle(void *qHandle) { + void** handle = qHandle; + if (handle == NULL || *handle == NULL) { + return; + } + + qKillQuery(*handle); +} \ No newline at end of file diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 0c08c77e329828396e7c6760814f15e247c8977d..52adffff7e8c3fc3dc611d984ec90cafb813dbd1 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -15,20 +15,21 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosmsg.h" + +#include "tglobal.h" #include "taoserror.h" -#include "tqueue.h" +#include "taosmsg.h" +#include "tcache.h" +#include "query.h" #include "trpc.h" #include "tsdb.h" -#include "twal.h" -#include "tdataformat.h" #include "vnode.h" #include "vnodeInt.h" -#include "query.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); +static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); void vnodeInitReadFp(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; @@ -58,19 +59,6 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } -// notify connection(handle) that current qhandle is created, if current connection from -// client is broken, the query needs to be killed immediately. -static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { - SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); - killQueryMsg->qhandle = htobe64((uint64_t) qhandle); - killQueryMsg->free = htons(1); - killQueryMsg->header.vgId = htonl(vgId); - killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); - - vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); - return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); -} - static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void * pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; @@ -88,18 +76,25 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vWarn("QInfo:%p connection %p broken, kill query", (void*)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); - // this message arrived here by means of the query message, so release the vnode is necessary - qKillQuery((qinfo_t) killQueryMsg->qhandle, vnodeRelease, pVnode); - vnodeRelease(pVnode); + // this message arrived here by means of the *query* message, so release the vnode is necessary + void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle)); + if (qhandle == NULL || *qhandle == NULL) { // todo handle invalid qhandle error + } else { +// qKillQuery((qinfo_t) killQueryMsg->qhandle); + taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true); + } + + vnodeRelease(pVnode); return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code } int32_t code = TSDB_CODE_SUCCESS; qinfo_t pQInfo = NULL; + void** handle = NULL; if (contLen != 0) { - code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); + code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->qhandle = htobe64((uint64_t) (pQInfo)); @@ -116,13 +111,15 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; // NOTE: there two refcount, needs to kill twice, todo refactor - qKillQuery(pQInfo, vnodeRelease, pVnode); - qKillQuery(pQInfo, vnodeRelease, pVnode); + // query has not been put into qhandle pool, kill it directly. + qKillQuery(pQInfo); + qKillQuery(pQInfo); return pRsp->code; } - vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); + handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2); + assert(*handle == pQInfo); } else { assert(pQInfo == NULL); vnodeRelease(pVnode); @@ -138,6 +135,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (pQInfo != NULL) { qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query + taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false); } return code; @@ -152,10 +150,18 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRetrieve->free = htons(pRetrieve->free); memset(pRet, 0, sizeof(SRspRet)); + int32_t ret = 0; + + void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, &pQInfo, sizeof(pQInfo)); + if (handle == NULL || *handle != pQInfo) { + ret = TSDB_CODE_QRY_INVALID_QHANDLE; + } if (pRetrieve->free == 1) { vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); - int32_t ret = qKillQuery(pQInfo, vnodeRelease, pVnode); + + taosCacheRelease(pVnode->qHandlePool, handle, true); +// int32_t ret = qKillQuery(pQInfo); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); @@ -184,10 +190,24 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRet->qhandle = pQInfo; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; } else { // no further execution invoked, release the ref to vnode - qDestroyQueryInfo(pQInfo, vnodeRelease, pVnode); + taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true); +// qDestroyQueryInfo(pQInfo); } } vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo); return code; } + +// notify connection(handle) that current qhandle is created, if current connection from +// client is broken, the query needs to be killed immediately. +int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { + SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); + killQueryMsg->qhandle = htobe64((uint64_t) qhandle); + killQueryMsg->free = htons(1); + killQueryMsg->header.vgId = htonl(vgId); + killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); + + vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); + return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); +} \ No newline at end of file