diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c
index 83700ce0a573ccf15a474a58ec4ebfda2634e2fd..1d66fb046745af172fdd190732d2e5200f250b0f 100644
--- a/src/client/src/tscLocal.c
+++ b/src/client/src/tscLocal.c
@@ -406,7 +406,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
- taosCacheEmpty(tscCacheHandle);
+ taosCacheEmpty(tscCacheHandle,false);
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
tscProcessServerVer(pSql);
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c
index b7c0d43ef03602f09f2a570f0e3828d1d0e0ff48..d73983e77c704e66c1c0cf491c6b8205d749a570 100644
--- a/src/client/src/tscServer.c
+++ b/src/client/src/tscServer.c
@@ -1950,7 +1950,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
}
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
- taosCacheEmpty(tscCacheHandle);
+ taosCacheEmpty(tscCacheHandle, false);
return 0;
}
@@ -1996,7 +1996,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
if (isSuperTable) { // if it is a super table, reset whole query cache
tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
- taosCacheEmpty(tscCacheHandle);
+ taosCacheEmpty(tscCacheHandle, false);
}
}
diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c
index 6bbb291b6aba169710d2e564489d5bda220d6e02..acd92db5989fb82c83073b851e8df90dbe767973 100644
--- a/src/dnode/src/dnodeVRead.c
+++ b/src/dnode/src/dnodeVRead.c
@@ -98,11 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
- if (pMsg->msgType == TSDB_MSG_TYPE_FETCH) {
- pVnode = vnodeGetVnode(pHead->vgId);
- } else {
- pVnode = vnodeAccquireVnode(pHead->vgId);
- }
+ pVnode = vnodeAccquireVnode(pHead->vgId);
if (pVnode == NULL) {
leftLen -= pHead->contLen;
@@ -179,24 +175,17 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads
}
-static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
+void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
- pRead->rpcMsg = pMsg->rpcMsg;
- pRead->pCont = qhandle;
- pRead->contLen = 0;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
+ pRead->pCont = qhandle;
+ pRead->contLen = 0;
- taos_queue queue = vnodeGetRqueue(pVnode);
- taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
+ taos_queue queue = vnodeAccquireRqueue(pVnode);
+ taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
}
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
- if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return;
- if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) {
- dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
- code = TSDB_CODE_SUCCESS;
- }
-
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp,
@@ -206,6 +195,12 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont);
+ vnodeRelease(pVnode);
+}
+
+void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
+ vnodeRelease(pVnode);
+ return;
}
static void *dnodeProcessReadQueue(void *param) {
@@ -219,9 +214,16 @@ static void *dnodeProcessReadQueue(void *param) {
break;
}
- dDebug("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
+ dDebug("%p, msg:%s will be processed in vread queue, qtype:%d", pReadMsg->rpcMsg.ahandle,
+ taosMsg[pReadMsg->rpcMsg.msgType], type);
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
- dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
+
+ if (type == TAOS_QTYPE_RPC) {
+ dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
+ } else {
+ dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
+ }
+
taosFreeQitem(pReadMsg);
}
diff --git a/src/inc/dnode.h b/src/inc/dnode.h
index b561c407a3415d7db27333d96e21a72d4f159d8b..1d33dafbaad8aecc1c74a84b7578efc8f39aacbc 100644
--- a/src/inc/dnode.h
+++ b/src/inc/dnode.h
@@ -53,6 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue);
+void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue();
diff --git a/src/inc/query.h b/src/inc/query.h
index eb8abace6278bb96c6ab5b4984735b09f347923e..88badc2d7b5a849114ee4437855d7cfe1c51c1d8 100644
--- a/src/inc/query.h
+++ b/src/inc/query.h
@@ -84,6 +84,13 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
*/
int32_t qKillQuery(qinfo_t qinfo);
+void* qOpenQueryMgmt(int32_t vgId);
+void qSetQueryMgmtClosed(void* pExecutor);
+void qCleanupQueryMgmt(void* pExecutor);
+void** qRegisterQInfo(void* pMgmt, void* qInfo);
+void** qAcquireQInfo(void* pMgmt, void** key);
+void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h
index 76ca99c9ad26bb9228cd1d94cf463657bf1a5f8f..e4ee058cef2987819640a43768edf7f483b8c1bc 100644
--- a/src/inc/taosdef.h
+++ b/src/inc/taosdef.h
@@ -365,6 +365,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_WAL 2
#define TAOS_QTYPE_CQ 3
+#define TAOS_QTYPE_QUERY 4
typedef enum {
TSDB_SUPER_TABLE = 0, // super table
diff --git a/src/inc/vnode.h b/src/inc/vnode.h
index 9f0c8cc24171184607b93ddaab30142ff29c4e7d..49bd67a04f1822efa53ce1c7eff15480c95f4ab9 100644
--- a/src/inc/vnode.h
+++ b/src/inc/vnode.h
@@ -52,6 +52,7 @@ void vnodeRelease(void *pVnode);
void* vnodeAccquireVnode(int32_t vgId); // add refcount
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
+void* vnodeAccquireRqueue(void *);
void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode);
diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c
index d4765d4c9ef71fa04cce47a68ea30ff8bee60fd5..af4a09a45a1fc314a648ec528cb4caaf9860df7a 100644
--- a/src/mnode/src/mnodeProfile.c
+++ b/src/mnode/src/mnodeProfile.c
@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
- tsMnodeConnCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn");
+ tsMnodeConnCache = taosCacheInit(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn");
return 0;
}
diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c
index 64711f5e554c614795ade05843b9c7fb842e15c1..97ffe839142d5018d7256303fb8247d89f1c7400 100644
--- a/src/mnode/src/mnodeShow.c
+++ b/src/mnode/src/mnodeShow.c
@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
- tsMnodeShowCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show");
+ tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show");
return 0;
}
diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c
index ae331a7d4440607c37effbedb899f3c514f93c64..cdaee53c38a480ed5479e2ebf27efc139b677498 100644
--- a/src/plugins/http/src/httpContext.c
+++ b/src/plugins/http/src/httpContext.c
@@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) {
}
bool httpInitContexts() {
- tsHttpServer.contextCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc");
+ tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
diff --git a/src/plugins/http/src/httpSession.c b/src/plugins/http/src/httpSession.c
index 14bb6da98395bb2b67530e1c85775907f35f72e2..256b0c9549d850fffedf4e12fd715cf4ececaa98 100644
--- a/src/plugins/http/src/httpSession.c
+++ b/src/plugins/http/src/httpSession.c
@@ -115,7 +115,7 @@ void httpCleanUpSessions() {
}
bool httpInitSessions() {
- tsHttpServer.sessionCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests");
+ tsHttpServer.sessionCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests");
if (tsHttpServer.sessionCache == NULL) {
httpError("failed to init session cache");
return false;
diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c
index 8156967d5d9f6ea726391eac582bffb5d01f851b..1882aa1850754d8da3c84dd0b50fe68569d91d93 100644
--- a/src/query/src/qExecutor.c
+++ b/src/query/src/qExecutor.c
@@ -13,8 +13,10 @@
* along with this program. If not, see .
*/
#include "os.h"
-#include "taosmsg.h"
+#include "tcache.h"
+#include "tglobal.h"
#include "qfill.h"
+#include "taosmsg.h"
#include "hash.h"
#include "qExecutor.h"
@@ -87,16 +89,18 @@ typedef struct {
STSCursor cur;
} SQueryStatusInfo;
+#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
-// uint32_t v = rand();
-// if (v % 5 <= 1) {
-// return NULL;
-// } else {
+ uint32_t v = rand();
+ if (v % 5 <= 1) {
+ return NULL;
+ } else {
return malloc(__size);
-// }
+ }
}
#define malloc u_malloc
+#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
@@ -1520,7 +1524,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
static bool isQueryKilled(SQInfo *pQInfo) {
- return false;
return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED);
}
@@ -5910,9 +5913,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
return TSDB_CODE_SUCCESS;
}
+typedef struct SQueryMgmt {
+ SCacheObj *qinfoPool; // query handle pool
+ int32_t vgId;
+ bool closed;
+ pthread_mutex_t lock;
+} SQueryMgmt;
+
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn,
qinfo_t* pQInfo) {
- assert(pQueryMsg != NULL);
+ assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
@@ -6356,3 +6366,112 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
+void freeqinfoFn(void *qhandle) {
+ void** handle = qhandle;
+ if (handle == NULL || *handle == NULL) {
+ return;
+ }
+
+ qKillQuery(*handle);
+}
+
+void* qOpenQueryMgmt(int32_t vgId) {
+ const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, refresh handle pool
+
+ char cacheName[128] = {0};
+ sprintf(cacheName, "qhandle_%d", vgId);
+
+ SQueryMgmt* pQueryHandle = calloc(1, sizeof(SQueryMgmt));
+
+ pQueryHandle->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
+ pQueryHandle->closed = false;
+ pthread_mutex_init(&pQueryHandle->lock, NULL);
+
+ qDebug("vgId:%d, open querymgmt success", vgId);
+ return pQueryHandle;
+}
+
+void qSetQueryMgmtClosed(void* pQMgmt) {
+ if (pQMgmt == NULL) {
+ return;
+ }
+
+ SQueryMgmt* pQueryMgmt = pQMgmt;
+ qDebug("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
+
+ pthread_mutex_lock(&pQueryMgmt->lock);
+ pQueryMgmt->closed = true;
+ pthread_mutex_unlock(&pQueryMgmt->lock);
+
+ taosCacheEmpty(pQueryMgmt->qinfoPool, true);
+}
+
+void qCleanupQueryMgmt(void* pQMgmt) {
+ if (pQMgmt == NULL) {
+ return;
+ }
+
+ SQueryMgmt* pQueryMgmt = pQMgmt;
+ int32_t vgId = pQueryMgmt->vgId;
+
+ assert(pQueryMgmt->closed);
+
+ SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool;
+ pQueryMgmt->qinfoPool = NULL;
+
+ taosCacheCleanup(pqinfoPool);
+ pthread_mutex_destroy(&pQueryMgmt->lock);
+ tfree(pQueryMgmt);
+
+ qDebug("vgId:%d querymgmt cleanup completed", vgId);
+}
+
+void** qRegisterQInfo(void* pMgmt, void* qInfo) {
+ if (pMgmt == NULL) {
+ return NULL;
+ }
+
+ SQueryMgmt *pQueryMgmt = pMgmt;
+ if (pQueryMgmt->qinfoPool == NULL) {
+ return NULL;
+ }
+
+ pthread_mutex_lock(&pQueryMgmt->lock);
+ if (pQueryMgmt->closed) {
+ pthread_mutex_unlock(&pQueryMgmt->lock);
+
+ return NULL;
+ } else {
+ void** handle = taosCachePut(pQueryMgmt->qinfoPool, qInfo, POINTER_BYTES, &qInfo, POINTER_BYTES, tsShellActivityTimer*2);
+ pthread_mutex_unlock(&pQueryMgmt->lock);
+
+ return handle;
+ }
+}
+
+void** qAcquireQInfo(void* pMgmt, void** key) {
+ SQueryMgmt *pQueryMgmt = pMgmt;
+
+ if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
+ return NULL;
+ }
+
+ void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, key, POINTER_BYTES);
+ if (handle == NULL || *handle == NULL) {
+ return NULL;
+ } else {
+ return handle;
+ }
+}
+
+void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree) {
+ SQueryMgmt *pQueryMgmt = pMgmt;
+
+ if (pQueryMgmt->qinfoPool == NULL) {
+ return NULL;
+ }
+
+ taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, needFree);
+ return 0;
+}
+
diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h
index 87bb1e41f7a4fd9ab2ea96ed2b6b2c1e7992d93c..b026ad43863995cadb2bd15067b145f660a2f0cd 100644
--- a/src/util/inc/tcache.h
+++ b/src/util/inc/tcache.h
@@ -65,7 +65,7 @@ typedef struct {
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime;
STrashElem * pTrash;
- const char * cacheName;
+ char* name;
// void * tmrCtrl;
// void * pTimer;
SCacheStatis statistics;
@@ -163,8 +163,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove);
/**
* move all data node into trash, clear node in trash can if it is not referenced by any clients
* @param handle
+ * @param _remove remove the data or not if refcount is greater than 0
*/
-void taosCacheEmpty(SCacheObj *pCacheObj);
+void taosCacheEmpty(SCacheObj *pCacheObj, bool _remove);
/**
* release all allocated memory and destroy the cache object.
diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c
index 016b3521880f2ad8ce6e6c1eb290609d5788206f..d546970868a5cb0ddf69bc54eb0c92c471eeb54d 100644
--- a/src/util/src/tcache.c
+++ b/src/util/src/tcache.c
@@ -119,9 +119,8 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = pNode->size;
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
- uDebug("key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes, cacheName:%s",
- pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size,
- pCacheObj->cacheName);
+ uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
+ pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode);
}
@@ -226,7 +225,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
*/
static void* taosCacheRefresh(void *handle);
-SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
+SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
if (refreshTimeInSeconds <= 0) {
return NULL;
}
@@ -238,7 +237,7 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
}
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false);
- pCacheObj->cacheName = cacheName;
+ pCacheObj->name = strdup(cacheName);
if (pCacheObj->pHashTable == NULL) {
free(pCacheObj);
uError("failed to allocate memory, reason:%s", strerror(errno));
@@ -268,10 +267,6 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
return pCacheObj;
}
-SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
- return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn, cacheName);
-}
-
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
SCacheDataNode *pNode;
@@ -288,16 +283,16 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
if (NULL != pNode) {
pCacheObj->totalSize += pNode->size;
- uDebug("key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
- "bytes size:%" PRId64 "bytes, cacheName:%s",
- key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime),
- (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize, pCacheObj->cacheName);
+ uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
+ "bytes size:%" PRId64 "bytes",
+ pCacheObj->name, key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime),
+ (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize);
} else {
- uError("key:%p, failed to added into cache, out of memory, cacheName:%s", key, pCacheObj->cacheName);
+ uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
}
} else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
- uDebug("key:%p, %p exist in cache, updated, cacheName:%s", key, pNode->data, pCacheObj->cacheName);
+ uDebug("cache:%s, key:%p, %p exist in cache, updated", pCacheObj->name, key, pNode->data);
}
__cache_unlock(pCacheObj);
@@ -332,10 +327,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
- uDebug("key:%p, %p is retrieved from cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data, ref, pCacheObj->cacheName);
+ uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, (*ptNode)->data, ref);
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
- uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName);
+ uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
@@ -360,11 +355,11 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
- uDebug("key:%p, %p expireTime is updated in cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data,
- T_REF_VAL_GET(*ptNode), pCacheObj->cacheName);
+ uDebug("cache:%s, key:%p, %p expireTime is updated in cache, refcnt:%d", pCacheObj->name, key,
+ (*ptNode)->data, T_REF_VAL_GET(*ptNode));
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
- uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName);
+ uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
@@ -383,7 +378,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
}
int32_t ref = T_REF_INC(ptNode);
- uDebug("%p acquired by data in cache, refcnt:%d, cacheName:%s", ptNode->data, ref, pCacheObj->cacheName);
+ uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
if (pCacheObj->extendLifespan) {
@@ -391,7 +386,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) {
ptNode->extendFactor += 1;
- uDebug("%p extend life time to %" PRId64, ptNode->data,
+ uDebug("cache:%s, %p extend life time to %" PRId64, pCacheObj->name, ptNode->data,
ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime);
}
}
@@ -437,7 +432,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
*data = NULL;
int16_t ref = T_REF_DEC(pNode);
- uDebug("key:%p, %p is released, refcnt:%d, cacheName:%s", pNode->key, pNode->data, ref, pCacheObj->cacheName);
+ uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref);
if (_remove && (!pNode->inTrashCan)) {
__cache_wr_lock(pCacheObj);
@@ -455,7 +450,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
}
}
-void taosCacheEmpty(SCacheObj *pCacheObj) {
+void taosCacheEmpty(SCacheObj *pCacheObj, bool _remove) {
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
__cache_wr_lock(pCacheObj);
@@ -465,12 +460,16 @@ void taosCacheEmpty(SCacheObj *pCacheObj) {
}
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
- taosCacheMoveToTrash(pCacheObj, pNode);
+ if (T_REF_VAL_GET(pNode) == 0 || _remove) {
+ taosCacheReleaseNode(pCacheObj, pNode);
+ } else {
+ taosCacheMoveToTrash(pCacheObj, pNode);
+ }
}
__cache_unlock(pCacheObj);
taosHashDestroyIter(pIter);
- taosTrashCanEmpty(pCacheObj, false);
+ taosTrashCanEmpty(pCacheObj, _remove);
}
void taosCacheCleanup(SCacheObj *pCacheObj) {
@@ -481,7 +480,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
pCacheObj->deleting = 1;
pthread_join(pCacheObj->refreshWorker, NULL);
- uInfo("cacheName:%p, will be cleanuped", pCacheObj->cacheName);
+ uInfo("cache:%s will be cleaned up", pCacheObj->name);
doCleanupDataCache(pCacheObj);
}
@@ -601,22 +600,25 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
- // if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
- if (T_REF_VAL_GET(pNode) <= 0) {
+
+ int32_t c = T_REF_VAL_GET(pNode);
+ if (c <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
} else {
- uDebug("key:%p, %p will not remove from cache, refcnt:%d, cacheName:%s", pNode->key, pNode->data,
- T_REF_VAL_GET(pNode), pCacheObj->cacheName);
+ uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key,
+ pNode->data, T_REF_VAL_GET(pNode));
}
}
taosHashDestroyIter(pIter);
- taosHashCleanup(pCacheObj->pHashTable);
+ // todo memory leak if there are object with refcount greater than 0 in hash table?
+ taosHashCleanup(pCacheObj->pHashTable);
__cache_unlock(pCacheObj);
taosTrashCanEmpty(pCacheObj, true);
__cache_lock_destroy(pCacheObj);
-
+
+ tfree(pCacheObj->name);
memset(pCacheObj, 0, sizeof(SCacheObj));
free(pCacheObj);
}
diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h
index 76e53f3962ed55cf1a3aee875d10b83b22ac9c37..4f22c7784d535b4190bf9b27e76f9ceed684211c 100644
--- a/src/vnode/inc/vnodeInt.h
+++ b/src/vnode/inc/vnodeInt.h
@@ -53,7 +53,7 @@ typedef struct {
STsdbCfg tsdbCfg;
SSyncCfg syncCfg;
SWalCfg walCfg;
- void *qHandlePool; // query handle pool
+ void *qMgmt;
char *rootDir;
char db[TSDB_DB_NAME_LEN];
} SVnodeObj;
diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c
index 5eb78fda526607ab6518c850b25c1d533b8767a2..6ccdc02acf1ce9f92f3af115ca64175ca952cbe5 100644
--- a/src/vnode/src/vnodeMain.c
+++ b/src/vnode/src/vnodeMain.c
@@ -46,7 +46,6 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
-static void vnodeFreeqHandle(void* phandle);
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
@@ -283,9 +282,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
- const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool
- pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle, "qhandle");
-
+ pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
@@ -328,6 +325,9 @@ void vnodeRelease(void *pVnodeRaw) {
return;
}
+ qCleanupQueryMgmt(pVnode->qMgmt);
+ pVnode->qMgmt = NULL;
+
if (pVnode->tsdb)
tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL;
@@ -393,6 +393,15 @@ void *vnodeAccquireVnode(int32_t vgId) {
return pVnode;
}
+void *vnodeAccquireRqueue(void *param) {
+ SVnodeObj *pVnode = param;
+ if (pVnode == NULL) return NULL;
+
+ atomic_add_fetch_32(&pVnode->refCount, 1);
+ vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
+ return ((SVnodeObj *)pVnode)->rqueue;
+}
+
void *vnodeGetRqueue(void *pVnode) {
return ((SVnodeObj *)pVnode)->rqueue;
}
@@ -466,7 +475,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
// release local resources only after cutting off outside connections
- taosCacheCleanup(pVnode->qHandlePool);
+ qSetQueryMgmtClosed(pVnode->qMgmt);
vnodeRelease(pVnode);
}
@@ -872,12 +881,3 @@ PARSE_OVER:
if(fp) fclose(fp);
return terrno;
}
-
-void vnodeFreeqHandle(void *qHandle) {
- void** handle = qHandle;
- if (handle == NULL || *handle == NULL) {
- return;
- }
-
- qKillQuery(*handle);
-}
\ No newline at end of file
diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c
index 517d073570159304f1a3347cf2f11e1df384a9f1..2ca69a3ddb9d9460d93e313af707b52eacab3dc8 100644
--- a/src/vnode/src/vnodeRead.c
+++ b/src/vnode/src/vnodeRead.c
@@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
+#include
#include "os.h"
#include "tglobal.h"
@@ -73,18 +74,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
- vWarn("QInfo:%p connection %p broken, kill query", (void*)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
+ void* handle = NULL;
+ if ((void**) killQueryMsg->qhandle != NULL) {
+ handle = *(void**) killQueryMsg->qhandle;
+ }
+
+ vWarn("QInfo:%p connection %p broken, kill query", handle, pReadMsg->rpcMsg.handle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
- // this message arrived here by means of the *query* message, so release the vnode is necessary
- void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle));
+ void** qhandle = qAcquireQInfo(pVnode->qMgmt, (void**) killQueryMsg->qhandle);
if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
} else {
- taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true);
+ assert(qhandle == (void**) killQueryMsg->qhandle);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
}
- vnodeRelease(pVnode);
return TSDB_CODE_TSC_QUERY_CANCELLED;
}
@@ -93,7 +98,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void** handle = NULL;
if (contLen != 0) {
- code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo);
+ code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
@@ -105,25 +110,30 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken
if (code == TSDB_CODE_SUCCESS) {
- if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) {
- vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo,
- pReadMsg->rpcMsg.handle);
- pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
+ // add lock here
+ handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
+ if (handle == NULL) { // failed to register qhandle
+ pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
- // NOTE: there two refcount, needs to kill twice, todo refactor
- // query has not been put into qhandle pool, kill it directly.
qKillQuery(pQInfo);
qKillQuery(pQInfo);
+ } else {
+ assert(*handle == pQInfo);
+ pRsp->qhandle = htobe64((uint64_t) (handle));
+ }
+
+ if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
+ vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle);
+ pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
+ // NOTE: there two refcount, needs to kill twice
+ // query has not been put into qhandle pool, kill it directly.
+ qKillQuery(pQInfo);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return pRsp->code;
}
-
- handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2);
- assert(*handle == pQInfo);
- pRsp->qhandle = htobe64((uint64_t) (handle));
} else {
assert(pQInfo == NULL);
- vnodeRelease(pVnode);
}
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
@@ -138,9 +148,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pQInfo != NULL) {
qTableQuery(pQInfo); // do execute query
-
assert(handle != NULL);
- taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
}
return code;
@@ -159,7 +168,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet, 0, sizeof(SRspRet));
int32_t ret = 0;
- void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, pQInfo, sizeof(pQInfo));
+ void** handle = qAcquireQInfo(pVnode->qMgmt, pQInfo);
if (handle == NULL || handle != pQInfo) {
ret = TSDB_CODE_QRY_INVALID_QHANDLE;
}
@@ -167,8 +176,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pRetrieve->free == 1) {
if (ret == TSDB_CODE_SUCCESS) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
- taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
@@ -178,30 +187,30 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRsp->completed = true;
pRsp->useconds = 0;
} else { // todo handle error
-
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
}
return ret;
}
- vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, *pQInfo);
-
int32_t code = qRetrieveQueryResultInfo(*pQInfo);
- if (code != TSDB_CODE_SUCCESS) {
+ if (code != TSDB_CODE_SUCCESS || ret != TSDB_CODE_SUCCESS) {
//TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
+
} else {
// todo check code and handle error in build result set
code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
- if (qHasMoreResultsToRetrieve(*pQInfo)) {
+ if (qHasMoreResultsToRetrieve(*handle)) {
+ dnodePutQhandleIntoReadQueue(pVnode, handle);
pRet->qhandle = handle;
- code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
+ code = TSDB_CODE_SUCCESS;
} else { // no further execution invoked, release the ref to vnode
- taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
}
}
-
+
return code;
}