diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c
index 947d0fa501e9c9953cac969f4b364e1b318549fa..acd92db5989fb82c83073b851e8df90dbe767973 100644
--- a/src/dnode/src/dnodeVRead.c
+++ b/src/dnode/src/dnodeVRead.c
@@ -98,11 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
- if (pMsg->msgType == TSDB_MSG_TYPE_FETCH) {
- pVnode = vnodeGetVnode(pHead->vgId);
- } else {
- pVnode = vnodeAccquireVnode(pHead->vgId);
- }
+ pVnode = vnodeAccquireVnode(pHead->vgId);
if (pVnode == NULL) {
leftLen -= pHead->contLen;
@@ -189,24 +185,7 @@ void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle) {
taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
}
-static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
- SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
- pRead->rpcMsg = pMsg->rpcMsg;
- pRead->pCont = qhandle;
- pRead->contLen = 0;
- pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
-
- taos_queue queue = vnodeGetRqueue(pVnode);
- taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
-}
-
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
- if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return;
- if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) {
- dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
- code = TSDB_CODE_SUCCESS;
- }
-
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp,
@@ -216,6 +195,12 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont);
+ vnodeRelease(pVnode);
+}
+
+void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
+ vnodeRelease(pVnode);
+ return;
}
static void *dnodeProcessReadQueue(void *param) {
@@ -235,6 +220,8 @@ static void *dnodeProcessReadQueue(void *param) {
if (type == TAOS_QTYPE_RPC) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
+ } else {
+ dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
}
taosFreeQitem(pReadMsg);
diff --git a/src/inc/query.h b/src/inc/query.h
index eb8abace6278bb96c6ab5b4984735b09f347923e..88badc2d7b5a849114ee4437855d7cfe1c51c1d8 100644
--- a/src/inc/query.h
+++ b/src/inc/query.h
@@ -84,6 +84,13 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
*/
int32_t qKillQuery(qinfo_t qinfo);
+void* qOpenQueryMgmt(int32_t vgId);
+void qSetQueryMgmtClosed(void* pExecutor);
+void qCleanupQueryMgmt(void* pExecutor);
+void** qRegisterQInfo(void* pMgmt, void* qInfo);
+void** qAcquireQInfo(void* pMgmt, void** key);
+void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c
index 8156967d5d9f6ea726391eac582bffb5d01f851b..06578916bb2f06f72183920c3dddf447835d3907 100644
--- a/src/query/src/qExecutor.c
+++ b/src/query/src/qExecutor.c
@@ -13,8 +13,10 @@
* along with this program. If not, see .
*/
#include "os.h"
-#include "taosmsg.h"
+#include "tcache.h"
+#include "tglobal.h"
#include "qfill.h"
+#include "taosmsg.h"
#include "hash.h"
#include "qExecutor.h"
@@ -1520,7 +1522,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
static bool isQueryKilled(SQInfo *pQInfo) {
- return false;
return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED);
}
@@ -5910,9 +5911,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
return TSDB_CODE_SUCCESS;
}
+typedef struct SQueryMgmt {
+ SCacheObj *qinfoPool; // query handle pool
+ int32_t vgId;
+ bool closed;
+ pthread_mutex_t lock;
+} SQueryMgmt;
+
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn,
qinfo_t* pQInfo) {
- assert(pQueryMsg != NULL);
+ assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
@@ -6356,3 +6364,112 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
+void freeqinfoFn(void *qhandle) {
+ void** handle = qhandle;
+ if (handle == NULL || *handle == NULL) {
+ return;
+ }
+
+ qKillQuery(*handle);
+}
+
+void* qOpenQueryMgmt(int32_t vgId) {
+ const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, refresh handle pool
+
+ char cacheName[128] = {0};
+ sprintf(cacheName, "qhandle_%d", vgId);
+
+ SQueryMgmt* pQueryHandle = calloc(1, sizeof(SQueryMgmt));
+
+ pQueryHandle->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
+ pQueryHandle->closed = false;
+ pthread_mutex_init(&pQueryHandle->lock, NULL);
+
+ qDebug("vgId:%d, open querymgmt success", vgId);
+ return pQueryHandle;
+}
+
+void qSetQueryMgmtClosed(void* pQMgmt) {
+ if (pQMgmt == NULL) {
+ return;
+ }
+
+ SQueryMgmt* pQueryMgmt = pQMgmt;
+ qDebug("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
+
+ pthread_mutex_lock(&pQueryMgmt->lock);
+ pQueryMgmt->closed = true;
+ pthread_mutex_unlock(&pQueryMgmt->lock);
+
+ taosCacheEmpty(pQueryMgmt->qinfoPool, true);
+}
+
+void qCleanupQueryMgmt(void* pQMgmt) {
+ if (pQMgmt == NULL) {
+ return;
+ }
+
+ SQueryMgmt* pQueryMgmt = pQMgmt;
+ int32_t vgId = pQueryMgmt->vgId;
+
+ assert(pQueryMgmt->closed);
+
+ SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool;
+ pQueryMgmt->qinfoPool = NULL;
+
+ taosCacheCleanup(pqinfoPool);
+ pthread_mutex_destroy(&pQueryMgmt->lock);
+ tfree(pQueryMgmt);
+
+ qDebug("vgId:%d querymgmt cleanup completed", vgId);
+}
+
+void** qRegisterQInfo(void* pMgmt, void* qInfo) {
+ if (pMgmt == NULL) {
+ return NULL;
+ }
+
+ SQueryMgmt *pQueryMgmt = pMgmt;
+ if (pQueryMgmt->qinfoPool == NULL) {
+ return NULL;
+ }
+
+ pthread_mutex_lock(&pQueryMgmt->lock);
+ if (pQueryMgmt->closed) {
+ pthread_mutex_unlock(&pQueryMgmt->lock);
+
+ return NULL;
+ } else {
+ void** handle = taosCachePut(pQueryMgmt->qinfoPool, qInfo, POINTER_BYTES, &qInfo, POINTER_BYTES, tsShellActivityTimer*2);
+ pthread_mutex_unlock(&pQueryMgmt->lock);
+
+ return handle;
+ }
+}
+
+void** qAcquireQInfo(void* pMgmt, void** key) {
+ SQueryMgmt *pQueryMgmt = pMgmt;
+
+ if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
+ return NULL;
+ }
+
+ void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, key, POINTER_BYTES);
+ if (handle == NULL || *handle == NULL) {
+ return NULL;
+ } else {
+ return handle;
+ }
+}
+
+void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree) {
+ SQueryMgmt *pQueryMgmt = pMgmt;
+
+ if (pQueryMgmt->qinfoPool == NULL) {
+ return NULL;
+ }
+
+ taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, needFree);
+ return 0;
+}
+
diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h
index 76e53f3962ed55cf1a3aee875d10b83b22ac9c37..4f22c7784d535b4190bf9b27e76f9ceed684211c 100644
--- a/src/vnode/inc/vnodeInt.h
+++ b/src/vnode/inc/vnodeInt.h
@@ -53,7 +53,7 @@ typedef struct {
STsdbCfg tsdbCfg;
SSyncCfg syncCfg;
SWalCfg walCfg;
- void *qHandlePool; // query handle pool
+ void *qMgmt;
char *rootDir;
char db[TSDB_DB_NAME_LEN];
} SVnodeObj;
diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c
index 4c446a78ec8448668e1391388efa5a39939b9638..6ccdc02acf1ce9f92f3af115ca64175ca952cbe5 100644
--- a/src/vnode/src/vnodeMain.c
+++ b/src/vnode/src/vnodeMain.c
@@ -46,7 +46,6 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
-static void vnodeFreeqHandle(void* phandle);
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
@@ -283,9 +282,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
- const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool
- pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle, "qhandle");
-
+ pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
@@ -328,6 +325,9 @@ void vnodeRelease(void *pVnodeRaw) {
return;
}
+ qCleanupQueryMgmt(pVnode->qMgmt);
+ pVnode->qMgmt = NULL;
+
if (pVnode->tsdb)
tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL;
@@ -475,7 +475,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
// release local resources only after cutting off outside connections
- taosCacheCleanup(pVnode->qHandlePool);
+ qSetQueryMgmtClosed(pVnode->qMgmt);
vnodeRelease(pVnode);
}
@@ -881,12 +881,3 @@ PARSE_OVER:
if(fp) fclose(fp);
return terrno;
}
-
-void vnodeFreeqHandle(void *qHandle) {
- void** handle = qHandle;
- if (handle == NULL || *handle == NULL) {
- return;
- }
-
- qKillQuery(*handle);
-}
\ No newline at end of file
diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c
index 48d2fa6878945f799f240918037c167466ce191d..3105d58aea06691efeac65e0a68f0430ee457546 100644
--- a/src/vnode/src/vnodeRead.c
+++ b/src/vnode/src/vnodeRead.c
@@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
+#include
#include "os.h"
#include "tglobal.h"
@@ -73,18 +74,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
- vWarn("QInfo:%p connection %p broken, kill query", (void*)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
+ void* handle = NULL;
+ if ((void**) killQueryMsg->qhandle != NULL) {
+ handle = *(void**) killQueryMsg->qhandle;
+ }
+
+ vWarn("QInfo:%p connection %p broken, kill query", handle, pReadMsg->rpcMsg.handle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
- // this message arrived here by means of the *query* message, so release the vnode is necessary
- void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle));
+ void** qhandle = qAcquireQInfo(pVnode->qMgmt, (void**) killQueryMsg->qhandle);
if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
} else {
- taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true);
+ assert(qhandle == (void**) killQueryMsg->qhandle);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
}
- vnodeRelease(pVnode);
return TSDB_CODE_TSC_QUERY_CANCELLED;
}
@@ -93,7 +98,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void** handle = NULL;
if (contLen != 0) {
- code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo);
+ code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
@@ -104,25 +109,30 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken
if (code == TSDB_CODE_SUCCESS) {
- if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) {
- vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo,
- pReadMsg->rpcMsg.handle);
- pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
+ // add lock here
+ handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
+ if (handle == NULL) { // failed to register qhandle
+ pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
- // NOTE: there two refcount, needs to kill twice, todo refactor
- // query has not been put into qhandle pool, kill it directly.
qKillQuery(pQInfo);
qKillQuery(pQInfo);
+ } else {
+ assert(*handle == pQInfo);
+ pRsp->qhandle = htobe64((uint64_t) (handle));
+ }
+
+ if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
+ vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle);
+ pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
+ // NOTE: there two refcount, needs to kill twice
+ // query has not been put into qhandle pool, kill it directly.
+ qKillQuery(pQInfo);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return pRsp->code;
}
-
- handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2);
- assert(*handle == pQInfo);
- pRsp->qhandle = htobe64((uint64_t) (handle));
} else {
assert(pQInfo == NULL);
- vnodeRelease(pVnode);
}
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
@@ -137,9 +147,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pQInfo != NULL) {
qTableQuery(pQInfo); // do execute query
-
assert(handle != NULL);
- taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
}
return code;
@@ -158,7 +167,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet, 0, sizeof(SRspRet));
int32_t ret = 0;
- void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, pQInfo, sizeof(pQInfo));
+ void** handle = qAcquireQInfo(pVnode->qMgmt, pQInfo);
if (handle == NULL || handle != pQInfo) {
ret = TSDB_CODE_QRY_INVALID_QHANDLE;
}
@@ -166,8 +175,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pRetrieve->free == 1) {
if (ret == TSDB_CODE_SUCCESS) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
- taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
@@ -177,30 +186,30 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRsp->completed = true;
pRsp->useconds = 0;
} else { // todo handle error
-
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
}
return ret;
}
- vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, *pQInfo);
-
int32_t code = qRetrieveQueryResultInfo(*pQInfo);
- if (code != TSDB_CODE_SUCCESS) {
+ if (code != TSDB_CODE_SUCCESS || ret != TSDB_CODE_SUCCESS) {
//TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
+
} else {
// todo check code and handle error in build result set
code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
- if (qHasMoreResultsToRetrieve(*pQInfo)) {
+ if (qHasMoreResultsToRetrieve(*handle)) {
+ dnodePutQhandleIntoReadQueue(pVnode, handle);
pRet->qhandle = handle;
- code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
+ code = TSDB_CODE_SUCCESS;
} else { // no further execution invoked, release the ref to vnode
- taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
+ qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
}
}
-
+
return code;
}