From d616e0f1c8912b0569bb617fe0dfb19996579dcf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Jul 2020 17:15:57 +0800 Subject: [PATCH] [td-822] --- src/client/src/tscServer.c | 16 ++-- src/client/src/tscSystem.c | 2 +- src/inc/query.h | 11 +-- src/mnode/src/mnodeProfile.c | 16 ++-- src/mnode/src/mnodeShow.c | 10 +- src/plugins/http/src/httpContext.c | 10 +- src/plugins/http/src/httpSession.c | 10 +- src/query/inc/qExecutor.h | 20 ++-- src/query/src/qExecutor.c | 38 ++++---- src/util/inc/tcache.h | 48 +++++----- src/util/src/tcache.c | 141 +++++++++++++++++------------ src/vnode/inc/vnodeInt.h | 1 + src/vnode/src/vnodeMain.c | 30 ++++-- src/vnode/src/vnodeRead.c | 74 +++++++++------ 14 files changed, 249 insertions(+), 178 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8dd5ef69f7..eb3235d9e4 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1674,8 +1674,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); assert(pTableMetaInfo->pTableMeta == NULL); - pTableMetaInfo->pTableMeta = - (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsTableMetaKeepTimer); + pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, + strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer); // todo handle out of memory case if (pTableMetaInfo->pTableMeta == NULL) { @@ -1878,7 +1878,8 @@ int tscProcessShowRsp(SSqlObj *pSql) { size_t size = 0; STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); - pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsTableMetaKeepTimer); + pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size, + tsTableMetaKeepTimer); SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); if (pQueryInfo->colList == NULL) { @@ -1948,9 +1949,8 @@ int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) { int tscProcessDropTableRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name); - if (pTableMeta == NULL) { - /* not in cache, abort */ + STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); + if (pTableMeta == NULL) { /* not in cache, abort */ return 0; } @@ -1974,7 +1974,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name); + STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); if (pTableMeta == NULL) { /* not in cache, abort */ return 0; } @@ -2124,7 +2124,7 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false); } - pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name); + pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); if (pTableMetaInfo->pTableMeta != NULL) { STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns, diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 201ace43de..f1d69fa261 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -148,7 +148,7 @@ void taos_init_imp() { refreshTime = refreshTime < 10 ? 10 : refreshTime; if (tscCacheHandle == NULL) { - tscCacheHandle = taosCacheInit(refreshTime); + tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL); } tscDebug("client is initialized successfully"); diff --git a/src/inc/query.h b/src/inc/query.h index 5fd2ede034..af3a89682c 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -20,6 +20,7 @@ extern "C" { #endif typedef void* qinfo_t; +typedef void (*_qinfo_free_fn_t)(void*); /** * create the qinfo object according to QueryTableMsg @@ -28,15 +29,13 @@ typedef void* qinfo_t; * @param qinfo * @return */ -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo); +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, _qinfo_free_fn_t fn, qinfo_t* qinfo); /** * Destroy QInfo object * @param qinfo qhandle - * @param fp destroy callback function, while the qhandle is destoried, invoke the fp - * @param param free callback params */ -void qDestroyQueryInfo(qinfo_t qinfo, void (*fp)(void*), void* param); +void qDestroyQueryInfo(qinfo_t qinfo); /** * the main query execution function, including query on both table and multitables, @@ -81,11 +80,9 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo); /** * kill current ongoing query and free query handle automatically * @param qinfo qhandle - * @param fp destroy callback function, while the qhandle is destoried, invoke the fp - * @param param free callback params * @return */ -int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param); +int32_t qKillQuery(qinfo_t qinfo); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index f3d6a3d344..a1d4be93c6 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -68,7 +68,7 @@ int32_t mnodeInitProfile() { mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); - tsMnodeConnCache = taosCacheInitWithCb(CONN_CHECK_TIME, mnodeFreeConn); + tsMnodeConnCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, CONN_CHECK_TIME,false, mnodeFreeConn); return 0; } @@ -101,8 +101,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { tstrncpy(connObj.user, user, sizeof(connObj.user)); char key[10]; - sprintf(key, "%u", connId); - SConnObj *pConn = taosCachePut(tsMnodeConnCache, key, &connObj, sizeof(connObj), CONN_KEEP_TIME); + int32_t len = sprintf(key, "%u", connId); + SConnObj *pConn = taosCachePut(tsMnodeConnCache, key, len, &connObj, sizeof(connObj), CONN_KEEP_TIME); mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); return pConn; @@ -115,10 +115,10 @@ void mnodeReleaseConn(SConnObj *pConn) { SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { char key[10]; - sprintf(key, "%u", connId); + int32_t len = sprintf(key, "%u", connId); uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); - SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, expireTime); + SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, len, expireTime); if (pConn == NULL) { mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); return NULL; @@ -547,7 +547,7 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); - SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); + SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, connIdStr, strlen(connIdStr)); if (pConn == NULL) { mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId); return TSDB_CODE_MND_INVALID_CONN_ID; @@ -577,7 +577,7 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10); - SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); + SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, connIdStr, strlen(connIdStr)); if (pConn == NULL) { mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId); return TSDB_CODE_MND_INVALID_CONN_ID; @@ -594,7 +594,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS; SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont; - SConnObj * pConn = taosCacheAcquireByName(tsMnodeConnCache, pKill->queryId); + SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, pKill->queryId, strlen(pKill->queryId)); if (pConn == NULL) { mError("connId:%s, failed to kill, conn not exist", pKill->queryId); return TSDB_CODE_MND_INVALID_CONN_ID; diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 6f5216dfa3..8999ce4083 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -65,7 +65,7 @@ int32_t mnodeInitShow() { mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); - tsMnodeShowCache = taosCacheInitWithCb(5, mnodeFreeShowObj); + tsMnodeShowCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, mnodeFreeShowObj); return 0; } @@ -364,9 +364,9 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) { static bool mnodeAccquireShowObj(SShowObj *pShow) { char key[10]; - sprintf(key, "%d", pShow->index); + int32_t len = sprintf(key, "%d", pShow->index); - SShowObj *pSaved = taosCacheAcquireByName(tsMnodeShowCache, key); + SShowObj *pSaved = taosCacheAcquireByKey(tsMnodeShowCache, key, len); if (pSaved == pShow) { mDebug("%p, show is accquired from cache", pShow); return true; @@ -379,9 +379,9 @@ static void *mnodePutShowObj(SShowObj *pShow, int32_t size) { if (tsMnodeShowCache != NULL) { char key[10]; pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1); - sprintf(key, "%d", pShow->index); + int32_t len = sprintf(key, "%d", pShow->index); - SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 6); + SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, len, pShow, size, 6); free(pShow); mDebug("%p, show is put into cache, index:%s", newQhandle, key); diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index b078be5930..7fb9720170 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) { } bool httpInitContexts() { - tsHttpServer.contextCache = taosCacheInitWithCb(2, httpDestroyContext); + tsHttpServer.contextCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 2, false, httpDestroyContext); if (tsHttpServer.contextCache == NULL) { httpError("failed to init context cache"); return false; @@ -104,14 +104,14 @@ HttpContext *httpCreateContext(int32_t fd) { if (pContext == NULL) return NULL; char contextStr[16] = {0}; - snprintf(contextStr, sizeof(contextStr), "%p", pContext); + int32_t keySize = snprintf(contextStr, sizeof(contextStr), "%p", pContext); pContext->fd = fd; pContext->httpVersion = HTTP_VERSION_10; pContext->lastAccessTime = taosGetTimestampSec(); pContext->state = HTTP_CONTEXT_STATE_READY; - HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, &pContext, sizeof(HttpContext *), 3); + HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, keySize, &pContext, sizeof(HttpContext *), 3); pContext->ppContext = ppContext; httpDebug("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext); @@ -123,9 +123,9 @@ HttpContext *httpCreateContext(int32_t fd) { HttpContext *httpGetContext(void *ptr) { char contextStr[16] = {0}; - snprintf(contextStr, sizeof(contextStr), "%p", ptr); + int32_t len = snprintf(contextStr, sizeof(contextStr), "%p", ptr); - HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, contextStr); + HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, contextStr, len); if (ppContext) { HttpContext *pContext = *ppContext; diff --git a/src/plugins/http/src/httpSession.c b/src/plugins/http/src/httpSession.c index 83602e1291..3a901167d5 100644 --- a/src/plugins/http/src/httpSession.c +++ b/src/plugins/http/src/httpSession.c @@ -33,9 +33,9 @@ void httpCreateSession(HttpContext *pContext, void *taos) { memset(&session, 0, sizeof(HttpSession)); session.taos = taos; session.refCount = 1; - snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); + int32_t len = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); - pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire); + pContext->session = taosCachePut(server->sessionCache, session.id, len, &session, sizeof(HttpSession), tsHttpSessionExpire); // void *temp = pContext->session; // taosCacheRelease(server->sessionCache, (void **)&temp, false); @@ -57,9 +57,9 @@ static void httpFetchSessionImp(HttpContext *pContext) { pthread_mutex_lock(&server->serverMutex); char sessionId[HTTP_SESSION_ID_LEN]; - snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); + int32_t len = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); - pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId); + pContext->session = taosCacheAcquireByKey(server->sessionCache, sessionId, len); if (pContext->session != NULL) { atomic_add_fetch_32(&pContext->session->refCount, 1); httpDebug("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd, @@ -115,7 +115,7 @@ void httpCleanUpSessions() { } bool httpInitSessions() { - tsHttpServer.sessionCache = taosCacheInitWithCb(5, httpDestroySession); + tsHttpServer.sessionCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession); if (tsHttpServer.sessionCache == NULL) { httpError("failed to init session cache"); return false; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a830723845..d7d59230b6 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -27,6 +27,7 @@ #include "tref.h" #include "tsdb.h" #include "tsqlfunction.h" +#include "query.h" struct SColumnFilterElem; typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); @@ -181,13 +182,13 @@ typedef struct SQueryRuntimeEnv { } SQueryRuntimeEnv; typedef struct SQInfo { - void* signature; - int32_t pointsInterpo; - int32_t code; // error code to returned to client - sem_t dataReady; - void* tsdb; - int32_t vgId; - + void* signature; + int32_t pointsInterpo; + int32_t code; // error code to returned to client + sem_t dataReady; + void* tsdb; + void* param; + int32_t vgId; STableGroupInfo tableGroupInfo; // table id list < only includes the STable list> STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure SQueryRuntimeEnv runtimeEnv; @@ -202,8 +203,9 @@ typedef struct SQInfo { * We later may refactor to remove this attribution by using another flag to denote * whether a multimeter query is completed or not. */ - int32_t tableIndex; - int32_t numOfGroupResultPages; + int32_t tableIndex; + int32_t numOfGroupResultPages; + _qinfo_free_fn_t fn; } SQInfo; #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9df161b3dc..5f5242cefd 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4069,7 +4069,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { return pFillCol; } -int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) { +int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery, void* freeParam, _qinfo_free_fn_t fn) { int32_t code = TSDB_CODE_SUCCESS; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; @@ -4083,14 +4083,16 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool pQInfo->tsdb = tsdb; pQInfo->vgId = vgId; + pQInfo->param = freeParam; + pQInfo->fn = fn; pRuntimeEnv->pQuery = pQuery; - pRuntimeEnv->pTSBuf = param; + pRuntimeEnv->pTSBuf = pTsBuf; pRuntimeEnv->cur.vgroupIndex = -1; pRuntimeEnv->stableQuery = isSTableQuery; pRuntimeEnv->prevGroupId = INT32_MIN; - if (param != NULL) { + if (pTsBuf != NULL) { int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); } @@ -5697,8 +5699,7 @@ static bool isValidQInfo(void *param) { return (sig == (uint64_t)pQInfo); } - -static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable) { +static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param, _qinfo_free_fn_t fn) { int32_t code = TSDB_CODE_SUCCESS; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -5731,7 +5732,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ } // filter the qualified - if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) { + if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable, param, fn)) != TSDB_CODE_SUCCESS) { goto _error; } @@ -5894,7 +5895,8 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { return TSDB_CODE_SUCCESS; } -int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn, + qinfo_t* pQInfo) { assert(pQueryMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -5984,7 +5986,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi goto _over; } - code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery); + code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param, fn); _over: free(tagCond); @@ -6020,7 +6022,7 @@ static void doDestoryQueryInfo(SQInfo* pQInfo) { freeQInfo(pQInfo); } -void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) { +void qDestroyQueryInfo(qinfo_t qHandle) { SQInfo* pQInfo = (SQInfo*) qHandle; if (!isValidQInfo(pQInfo)) { return; @@ -6030,11 +6032,12 @@ void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) { qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref); if (ref == 0) { - doDestoryQueryInfo(pQInfo); - - if (fp != NULL) { - fp(param); + if (pQInfo->fn != NULL) { + assert(pQInfo->param != NULL); + pQInfo->fn(pQInfo->param); } + + doDestoryQueryInfo(pQInfo); } } @@ -6048,7 +6051,7 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); - qDestroyQueryInfo(pQInfo, fp, param); + qDestroyQueryInfo(pQInfo); return; } @@ -6069,7 +6072,7 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { } sem_post(&pQInfo->dataReady); - qDestroyQueryInfo(pQInfo, fp, param); + qDestroyQueryInfo(pQInfo); } int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { @@ -6162,7 +6165,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co return code; } -int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { +int32_t qKillQuery(qinfo_t qinfo) { SQInfo *pQInfo = (SQInfo *)qinfo; if (pQInfo == NULL || !isValidQInfo(pQInfo)) { @@ -6170,8 +6173,7 @@ int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { } setQueryKilled(pQInfo); - qDestroyQueryInfo(pQInfo, fp, param); - + qDestroyQueryInfo(pQInfo); return TSDB_CODE_SUCCESS; } diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index 17b3823831..cd3d0d436f 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -24,6 +24,8 @@ extern "C" { #include "tref.h" #include "hash.h" +typedef void (*__cache_freeres_fn_t)(void*); + typedef struct SCacheStatis { int64_t missCount; int64_t hitCount; @@ -34,14 +36,15 @@ typedef struct SCacheStatis { typedef struct SCacheDataNode { uint64_t addedTime; // the added time when this element is added or updated into cache - uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache + uint64_t lifespan; // expiredTime expiredTime when this element should be remove from cache uint64_t signature; uint32_t size; // allocated size for current SCacheDataNode - uint16_t keySize: 15; - bool inTrashCan: 1;// denote if it is in trash or not T_REF_DECLARE() - char *key; - char data[]; + uint16_t keySize: 15; // max key size: 32kb + bool inTrashCan: 1;// denote if it is in trash or not + int32_t extendFactor; // number of life span extend + char *key; + char data[]; } SCacheDataNode; typedef struct STrashElem { @@ -62,29 +65,32 @@ typedef struct { int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. int64_t refreshTime; STrashElem * pTrash; - void * tmrCtrl; - void * pTimer; +// void * tmrCtrl; +// void * pTimer; SCacheStatis statistics; SHashObj * pHashTable; - _hash_free_fn_t freeFp; + __cache_freeres_fn_t freeFp; uint32_t numOfElemsInTrash; // number of element in trash uint8_t deleting; // set the deleting flag to stop refreshing ASAP. pthread_t refreshWorker; - + bool extendLifespan; // auto extend life span when one item is accessed. #if defined(LINUX) pthread_rwlock_t lock; #else - pthread_mutex_t lock; + pthread_mutex_t lock; #endif } SCacheObj; /** * initialize the cache object - * @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and - * not referenced by other objects + * @param keyType key type + * @param refreshTimeInSeconds refresh operation interval time, the maximum survival time when one element is expired + * and not referenced by other objects + * @param extendLifespan auto extend lifespan, if accessed + * @param fn free resource callback function * @return */ -SCacheObj *taosCacheInit(int64_t refreshTimeInSeconds); +SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn); /** * initialize the cache object and set the free object callback function @@ -92,7 +98,7 @@ SCacheObj *taosCacheInit(int64_t refreshTimeInSeconds); * @param freeCb * @return */ -SCacheObj *taosCacheInitWithCb(int64_t refreshTimeInSeconds, void (*freeCb)(void *data)); +SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn); /** * add data into cache @@ -104,7 +110,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTimeInSeconds, void (*freeCb)(void * @param keepTime survival time in second * @return cached element */ -void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int keepTimeInSeconds); +void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int keepTimeInSeconds); /** * get data from cache @@ -112,22 +118,23 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz * @param key key * @return cached data or NULL */ -void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key); +void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen); /** * update the expire time of data in cache * @param pCacheObj cache object * @param key key + * @param keyLen keyLen * @param expireTime new expire time of data * @return */ -void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime); +void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, size_t keyLen, uint64_t expireTime); /** * Add one reference count for the exist data, and assign this data for a new owner. * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore. - * This procedure is a faster version of taosCacheAcquireByName function, which avoids the sideeffect of the problem of - * the data is moved to trash, and taosCacheAcquireByName will fail to retrieve it again. + * This procedure is a faster version of taosCacheAcquireByKey function, which avoids the sideeffect of the problem of + * the data is moved to trash, and taosCacheAcquireByKey will fail to retrieve it again. * * @param handle * @param data @@ -148,8 +155,7 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data); * if it is referenced by other object, it will be remain in cache * @param handle cache object * @param data not the key, actually referenced data - * @param _remove force model, reduce the ref count and move the data into - * pTrash + * @param _remove force model, reduce the ref count and move the data into pTrash */ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove); diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 2dd641731c..91d559a92b 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -167,7 +167,7 @@ static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode // update the timestamp information for updated key/value pNewNode->addedTime = taosGetTimestampMs(); - pNewNode->expiredTime = pNewNode->addedTime + duration; + pNewNode->lifespan = duration; T_REF_INC(pNewNode); @@ -224,8 +224,8 @@ static void doCleanupDataCache(SCacheObj *pCacheObj); */ static void* taosCacheRefresh(void *handle); -SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) { - if (refreshTime <= 0) { +SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn) { + if (refreshTimeInSeconds <= 0) { return NULL; } @@ -235,7 +235,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) return NULL; } - pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false); + pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false); if (pCacheObj->pHashTable == NULL) { free(pCacheObj); uError("failed to allocate memory, reason:%s", strerror(errno)); @@ -243,10 +243,9 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) } // set free cache node callback function for hash table - // taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode); - - pCacheObj->freeFp = freeCb; - pCacheObj->refreshTime = refreshTime * 1000; + pCacheObj->freeFp = fn; + pCacheObj->refreshTime = refreshTimeInSeconds * 1000; + pCacheObj->extendLifespan = extendLifespan; if (__cache_lock_init(pCacheObj) != 0) { taosHashCleanup(pCacheObj->pHashTable); @@ -256,7 +255,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) return NULL; } - pthread_attr_t thattr; + pthread_attr_t thattr = {0}; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); @@ -266,19 +265,17 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) return pCacheObj; } -SCacheObj *taosCacheInit(int64_t refreshTime) { - return taosCacheInitWithCb(refreshTime, NULL); +SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn) { + return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn); } -void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int duration) { +void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) { SCacheDataNode *pNode; if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) { return NULL; } - size_t keyLen = strlen(key); - __cache_wr_lock(pCacheObj); SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL; @@ -288,14 +285,14 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz if (NULL != pNode) { pCacheObj->totalSize += pNode->size; - uDebug("key:%s %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes", - key, pNode, pNode->addedTime, pNode->expiredTime, pCacheObj->totalSize, dataSize); + uDebug("key:%p %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes", + key, pNode, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime), pCacheObj->totalSize, dataSize); } else { - uError("key:%s failed to added into cache, out of memory", key); + uError("key:%p failed to added into cache, out of memory", key); } } else { // old data exists, update the node pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L); - uDebug("key:%s %p exist in cache, updated", key, pNode); + uDebug("key:%p %p exist in cache, updated", key, pNode); } __cache_unlock(pCacheObj); @@ -303,47 +300,55 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz return (pNode != NULL) ? pNode->data : NULL; } -void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) { +void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { return NULL; } - uint32_t keyLen = (uint32_t)strlen(key); - __cache_rd_lock(pCacheObj); SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); + + int32_t ref = 0; if (ptNode != NULL) { - T_REF_INC(*ptNode); + ref = T_REF_INC(*ptNode); + + // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan + if (pCacheObj->extendLifespan) { + int64_t now = taosGetTimestampMs(); + + if ((now - (*ptNode)->addedTime) < (*ptNode)->lifespan * (*ptNode)->extendFactor) { + (*ptNode)->extendFactor += 1; + uDebug("key:%p extend life time to %"PRId64, key, (*ptNode)->lifespan * (*ptNode)->extendFactor + (*ptNode)->addedTime); + } + } } - __cache_unlock(pCacheObj); if (ptNode != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("key:%s is retrieved from cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); + uDebug("key:%p is retrieved from cache, %p refcnt:%d", key, (*ptNode), ref); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); - uDebug("key:%s not in cache, retrieved failed", key); + uDebug("key:%p not in cache, retrieved failed", key); } atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); return (ptNode != NULL) ? (*ptNode)->data : NULL; } -void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime) { +void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, size_t keyLen, uint64_t expireTime) { if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { return NULL; } - - uint32_t keyLen = (uint32_t)strlen(key); - + __cache_rd_lock(pCacheObj); SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); if (ptNode != NULL) { T_REF_INC(*ptNode); - (*ptNode)->expiredTime = expireTime; + (*ptNode)->extendFactor += 1; +// (*ptNode)->lifespan = expireTime; } __cache_unlock(pCacheObj); @@ -373,7 +378,17 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { int32_t ref = T_REF_INC(ptNode); uDebug("%p acquired by data in cache, refcnt:%d", ptNode, ref) - + + // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan + if (pCacheObj->extendLifespan) { + int64_t now = taosGetTimestampMs(); + + if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) { + ptNode->extendFactor += 1; + uDebug("key:%p extend life time to %"PRId64, ptNode, ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime); + } + } + // the data if referenced by at least one object, so the reference count must be greater than the value of 2. assert(ref >= 2); return data; @@ -408,7 +423,6 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { size_t offset = offsetof(SCacheDataNode, data); SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset); - if (pNode->signature != (uint64_t)pNode) { uError("key: %p release invalid cache data", pNode); return; @@ -420,9 +434,16 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { if (_remove) { __cache_wr_lock(pCacheObj); - // pNode may be released immediately by other thread after the reference count of pNode is set to 0, - // So we need to lock it in the first place. - taosCacheMoveToTrash(pCacheObj, pNode); + + if (T_REF_VAL_GET(pNode) == 0) { + // remove directly, if not referenced by other users + taosCacheReleaseNode(pCacheObj, pNode); + } else { + // pNode may be released immediately by other thread after the reference count of pNode is set to 0, + // So we need to lock it in the first place. + taosCacheMoveToTrash(pCacheObj, pNode); + } + __cache_unlock(pCacheObj); } } @@ -473,11 +494,11 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char * memcpy(pNewNode->key, key, keyLen); - pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); - pNewNode->expiredTime = pNewNode->addedTime + duration; - - pNewNode->signature = (uint64_t)pNewNode; - pNewNode->size = (uint32_t)totalSize; + pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); + pNewNode->lifespan = duration; + pNewNode->extendFactor = 1; + pNewNode->signature = (uint64_t)pNewNode; + pNewNode->size = (uint32_t)totalSize; return pNewNode; } @@ -501,7 +522,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { pNode->inTrashCan = true; pCacheObj->numOfElemsInTrash++; - uDebug("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash); + uDebug("key:%p %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash); } void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { @@ -522,7 +543,10 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { } pElem->pData->signature = 0; - if (pCacheObj->freeFp) pCacheObj->freeFp(pElem->pData->data); + if (pCacheObj->freeFp) { + pCacheObj->freeFp(pElem->pData->data); + } + free(pElem->pData); free(pElem); } @@ -549,7 +573,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { } if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { - uDebug("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, + uDebug("key:%p %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, pCacheObj->numOfElemsInTrash - 1); STrashElem *p = pElem; @@ -569,9 +593,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); while (taosHashIterNext(pIter)) { SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); - // if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { taosCacheReleaseNode(pCacheObj, pNode); - //} } taosHashDestroyIter(pIter); @@ -610,27 +632,32 @@ void* taosCacheRefresh(void *handle) { // reset the count value count = 0; - size_t num = taosHashGetSize(pCacheObj->pHashTable); - if (num == 0) { + size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable); + if (elemInHash + pCacheObj->numOfElemsInTrash == 0) { continue; } - uint64_t expiredTime = taosGetTimestampMs(); pCacheObj->statistics.refreshCount++; - SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); + // refresh data in hash table + if (elemInHash > 0) { + int64_t expiredTime = taosGetTimestampMs(); - __cache_wr_lock(pCacheObj); - while (taosHashIterNext(pIter)) { - SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); - if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { - taosCacheReleaseNode(pCacheObj, pNode); + SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); + + __cache_wr_lock(pCacheObj); + while (taosHashIterNext(pIter)) { + SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); + if ((pNode->addedTime + pNode->lifespan * pNode->extendFactor) <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { + taosCacheReleaseNode(pCacheObj, pNode); + } } - } - __cache_unlock(pCacheObj); + __cache_unlock(pCacheObj); + + taosHashDestroyIter(pIter); + } - taosHashDestroyIter(pIter); taosTrashCanEmpty(pCacheObj, false); } diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index e428eae688..76e53f3962 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -53,6 +53,7 @@ typedef struct { STsdbCfg tsdbCfg; SSyncCfg syncCfg; SWalCfg walCfg; + void *qHandlePool; // query handle pool char *rootDir; char db[TSDB_DB_NAME_LEN]; } SVnodeObj; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 4693dc02d3..49bc35e913 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -15,19 +15,22 @@ #define _DEFAULT_SOURCE #include "os.h" + +#include "tcache.h" +#include "cJSON.h" +#include "dnode.h" #include "hash.h" #include "taoserror.h" #include "taosmsg.h" -#include "tutil.h" +#include "tglobal.h" #include "trpc.h" #include "tsdb.h" #include "ttime.h" #include "ttimer.h" -#include "cJSON.h" -#include "tglobal.h" -#include "dnode.h" +#include "tutil.h" #include "vnode.h" #include "vnodeInt.h" +#include "query.h" #define TSDB_VNODE_VERSION_CONTENT_LEN 31 @@ -43,6 +46,7 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +static void vnodeFreeqHandle(void* phandle); static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; @@ -280,6 +284,9 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); + const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool + pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle); + pVnode->events = NULL; pVnode->status = TAOS_VN_STATUS_READY; vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); @@ -848,12 +855,12 @@ static int32_t vnodeReadVersion(SVnodeObj *pVnode) { goto PARSE_OVER; } - cJSON *version = cJSON_GetObjectItem(root, "version"); - if (!version || version->type != cJSON_Number) { + cJSON *ver = cJSON_GetObjectItem(root, "version"); + if (!ver || ver->type != cJSON_Number) { vError("vgId:%d, failed to read vnode version, version not found", pVnode->vgId); goto PARSE_OVER; } - pVnode->version = version->valueint; + pVnode->version = ver->valueint; terrno = TSDB_CODE_SUCCESS; vInfo("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version); @@ -864,3 +871,12 @@ PARSE_OVER: if(fp) fclose(fp); return terrno; } + +void vnodeFreeqHandle(void *qHandle) { + void** handle = qHandle; + if (handle == NULL || *handle == NULL) { + return; + } + + qKillQuery(*handle); +} \ No newline at end of file diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 0c08c77e32..52adffff7e 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -15,20 +15,21 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosmsg.h" + +#include "tglobal.h" #include "taoserror.h" -#include "tqueue.h" +#include "taosmsg.h" +#include "tcache.h" +#include "query.h" #include "trpc.h" #include "tsdb.h" -#include "twal.h" -#include "tdataformat.h" #include "vnode.h" #include "vnodeInt.h" -#include "query.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); +static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); void vnodeInitReadFp(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; @@ -58,19 +59,6 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } -// notify connection(handle) that current qhandle is created, if current connection from -// client is broken, the query needs to be killed immediately. -static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { - SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); - killQueryMsg->qhandle = htobe64((uint64_t) qhandle); - killQueryMsg->free = htons(1); - killQueryMsg->header.vgId = htonl(vgId); - killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); - - vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); - return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); -} - static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void * pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; @@ -88,18 +76,25 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vWarn("QInfo:%p connection %p broken, kill query", (void*)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); - // this message arrived here by means of the query message, so release the vnode is necessary - qKillQuery((qinfo_t) killQueryMsg->qhandle, vnodeRelease, pVnode); - vnodeRelease(pVnode); + // this message arrived here by means of the *query* message, so release the vnode is necessary + void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle)); + if (qhandle == NULL || *qhandle == NULL) { // todo handle invalid qhandle error + } else { +// qKillQuery((qinfo_t) killQueryMsg->qhandle); + taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true); + } + + vnodeRelease(pVnode); return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code } int32_t code = TSDB_CODE_SUCCESS; qinfo_t pQInfo = NULL; + void** handle = NULL; if (contLen != 0) { - code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); + code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->qhandle = htobe64((uint64_t) (pQInfo)); @@ -116,13 +111,15 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; // NOTE: there two refcount, needs to kill twice, todo refactor - qKillQuery(pQInfo, vnodeRelease, pVnode); - qKillQuery(pQInfo, vnodeRelease, pVnode); + // query has not been put into qhandle pool, kill it directly. + qKillQuery(pQInfo); + qKillQuery(pQInfo); return pRsp->code; } - vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); + handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2); + assert(*handle == pQInfo); } else { assert(pQInfo == NULL); vnodeRelease(pVnode); @@ -138,6 +135,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (pQInfo != NULL) { qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query + taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false); } return code; @@ -152,10 +150,18 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRetrieve->free = htons(pRetrieve->free); memset(pRet, 0, sizeof(SRspRet)); + int32_t ret = 0; + + void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, &pQInfo, sizeof(pQInfo)); + if (handle == NULL || *handle != pQInfo) { + ret = TSDB_CODE_QRY_INVALID_QHANDLE; + } if (pRetrieve->free == 1) { vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); - int32_t ret = qKillQuery(pQInfo, vnodeRelease, pVnode); + + taosCacheRelease(pVnode->qHandlePool, handle, true); +// int32_t ret = qKillQuery(pQInfo); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); @@ -184,10 +190,24 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRet->qhandle = pQInfo; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; } else { // no further execution invoked, release the ref to vnode - qDestroyQueryInfo(pQInfo, vnodeRelease, pVnode); + taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true); +// qDestroyQueryInfo(pQInfo); } } vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo); return code; } + +// notify connection(handle) that current qhandle is created, if current connection from +// client is broken, the query needs to be killed immediately. +int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { + SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); + killQueryMsg->qhandle = htobe64((uint64_t) qhandle); + killQueryMsg->free = htons(1); + killQueryMsg->header.vgId = htonl(vgId); + killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); + + vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); + return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); +} \ No newline at end of file -- GitLab