提交 1f172151 编写于 作者: H Haojun Liao

[td-225] add query mgmt module & refactor the vnode ref management during query processing

上级 66805aaf
......@@ -98,11 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
if (pMsg->msgType == TSDB_MSG_TYPE_FETCH) {
pVnode = vnodeGetVnode(pHead->vgId);
} else {
pVnode = vnodeAccquireVnode(pHead->vgId);
}
if (pVnode == NULL) {
leftLen -= pHead->contLen;
......@@ -189,24 +185,7 @@ void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle) {
taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
}
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) {
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
code = TSDB_CODE_SUCCESS;
}
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp,
......@@ -216,6 +195,12 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode);
}
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
vnodeRelease(pVnode);
return;
}
static void *dnodeProcessReadQueue(void *param) {
......@@ -235,6 +220,8 @@ static void *dnodeProcessReadQueue(void *param) {
if (type == TAOS_QTYPE_RPC) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
} else {
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
}
taosFreeQitem(pReadMsg);
......
......@@ -84,6 +84,13 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
*/
int32_t qKillQuery(qinfo_t qinfo);
void* qOpenQueryMgmt(int32_t vgId);
void qSetQueryMgmtClosed(void* pExecutor);
void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, void* qInfo);
void** qAcquireQInfo(void* pMgmt, void** key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree);
#ifdef __cplusplus
}
#endif
......
......@@ -13,8 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tglobal.h"
#include "qfill.h"
#include "taosmsg.h"
#include "hash.h"
#include "qExecutor.h"
......@@ -1520,7 +1522,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
static bool isQueryKilled(SQInfo *pQInfo) {
return false;
return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -5910,9 +5911,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
return TSDB_CODE_SUCCESS;
}
typedef struct SQueryMgmt {
SCacheObj *qinfoPool; // query handle pool
int32_t vgId;
bool closed;
pthread_mutex_t lock;
} SQueryMgmt;
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn,
qinfo_t* pQInfo) {
assert(pQueryMsg != NULL);
assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -6356,3 +6364,112 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
void freeqinfoFn(void *qhandle) {
void** handle = qhandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillQuery(*handle);
}
void* qOpenQueryMgmt(int32_t vgId) {
const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, refresh handle pool
char cacheName[128] = {0};
sprintf(cacheName, "qhandle_%d", vgId);
SQueryMgmt* pQueryHandle = calloc(1, sizeof(SQueryMgmt));
pQueryHandle->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryHandle->closed = false;
pthread_mutex_init(&pQueryHandle->lock, NULL);
qDebug("vgId:%d, open querymgmt success", vgId);
return pQueryHandle;
}
void qSetQueryMgmtClosed(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
SQueryMgmt* pQueryMgmt = pQMgmt;
qDebug("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheEmpty(pQueryMgmt->qinfoPool, true);
}
void qCleanupQueryMgmt(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
SQueryMgmt* pQueryMgmt = pQMgmt;
int32_t vgId = pQueryMgmt->vgId;
assert(pQueryMgmt->closed);
SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool;
pQueryMgmt->qinfoPool = NULL;
taosCacheCleanup(pqinfoPool);
pthread_mutex_destroy(&pQueryMgmt->lock);
tfree(pQueryMgmt);
qDebug("vgId:%d querymgmt cleanup completed", vgId);
}
void** qRegisterQInfo(void* pMgmt, void* qInfo) {
if (pMgmt == NULL) {
return NULL;
}
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
return NULL;
}
pthread_mutex_lock(&pQueryMgmt->lock);
if (pQueryMgmt->closed) {
pthread_mutex_unlock(&pQueryMgmt->lock);
return NULL;
} else {
void** handle = taosCachePut(pQueryMgmt->qinfoPool, qInfo, POINTER_BYTES, &qInfo, POINTER_BYTES, tsShellActivityTimer*2);
pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
}
}
void** qAcquireQInfo(void* pMgmt, void** key) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
return NULL;
}
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, key, POINTER_BYTES);
if (handle == NULL || *handle == NULL) {
return NULL;
} else {
return handle;
}
}
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
return NULL;
}
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, needFree);
return 0;
}
......@@ -53,7 +53,7 @@ typedef struct {
STsdbCfg tsdbCfg;
SSyncCfg syncCfg;
SWalCfg walCfg;
void *qHandlePool; // query handle pool
void *qMgmt;
char *rootDir;
char db[TSDB_DB_NAME_LEN];
} SVnodeObj;
......
......@@ -46,7 +46,6 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
static void vnodeFreeqHandle(void* phandle);
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
......@@ -283,9 +282,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool
pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle, "qhandle");
pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
......@@ -328,6 +325,9 @@ void vnodeRelease(void *pVnodeRaw) {
return;
}
qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL;
if (pVnode->tsdb)
tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL;
......@@ -475,7 +475,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
// release local resources only after cutting off outside connections
taosCacheCleanup(pVnode->qHandlePool);
qSetQueryMgmtClosed(pVnode->qMgmt);
vnodeRelease(pVnode);
}
......@@ -881,12 +881,3 @@ PARSE_OVER:
if(fp) fclose(fp);
return terrno;
}
void vnodeFreeqHandle(void *qHandle) {
void** handle = qHandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillQuery(*handle);
}
\ No newline at end of file
......@@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include <dnode.h>
#include "os.h"
#include "tglobal.h"
......@@ -73,18 +74,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
vWarn("QInfo:%p connection %p broken, kill query", (void*)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
void* handle = NULL;
if ((void**) killQueryMsg->qhandle != NULL) {
handle = *(void**) killQueryMsg->qhandle;
}
vWarn("QInfo:%p connection %p broken, kill query", handle, pReadMsg->rpcMsg.handle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
// this message arrived here by means of the *query* message, so release the vnode is necessary
void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle));
void** qhandle = qAcquireQInfo(pVnode->qMgmt, (void**) killQueryMsg->qhandle);
if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
} else {
taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true);
assert(qhandle == (void**) killQueryMsg->qhandle);
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
}
vnodeRelease(pVnode);
return TSDB_CODE_TSC_QUERY_CANCELLED;
}
......@@ -93,7 +98,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void** handle = NULL;
if (contLen != 0) {
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo);
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
......@@ -104,25 +109,30 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken
if (code == TSDB_CODE_SUCCESS) {
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo,
pReadMsg->rpcMsg.handle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// add lock here
handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
if (handle == NULL) { // failed to register qhandle
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
// NOTE: there two refcount, needs to kill twice, todo refactor
// query has not been put into qhandle pool, kill it directly.
qKillQuery(pQInfo);
qKillQuery(pQInfo);
} else {
assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) (handle));
}
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// NOTE: there two refcount, needs to kill twice
// query has not been put into qhandle pool, kill it directly.
qKillQuery(pQInfo);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return pRsp->code;
}
handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2);
assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) (handle));
} else {
assert(pQInfo == NULL);
vnodeRelease(pVnode);
}
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
......@@ -137,9 +147,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pQInfo != NULL) {
qTableQuery(pQInfo); // do execute query
assert(handle != NULL);
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
}
return code;
......@@ -158,7 +167,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet, 0, sizeof(SRspRet));
int32_t ret = 0;
void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, pQInfo, sizeof(pQInfo));
void** handle = qAcquireQInfo(pVnode->qMgmt, pQInfo);
if (handle == NULL || handle != pQInfo) {
ret = TSDB_CODE_QRY_INVALID_QHANDLE;
}
......@@ -166,8 +175,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pRetrieve->free == 1) {
if (ret == TSDB_CODE_SUCCESS) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
......@@ -177,27 +186,27 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRsp->completed = true;
pRsp->useconds = 0;
} else { // todo handle error
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
}
return ret;
}
vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, *pQInfo);
int32_t code = qRetrieveQueryResultInfo(*pQInfo);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS || ret != TSDB_CODE_SUCCESS) {
//TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else {
// todo check code and handle error in build result set
code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(*pQInfo)) {
if (qHasMoreResultsToRetrieve(*handle)) {
dnodePutQhandleIntoReadQueue(pVnode, handle);
pRet->qhandle = handle;
code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
code = TSDB_CODE_SUCCESS;
} else { // no further execution invoked, release the ref to vnode
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册