提交 3206bf96 编写于 作者: H Haojun Liao

[td-545] fix bugs in vnode ref management

上级 7649770d
...@@ -32,11 +32,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMs ...@@ -32,11 +32,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMs
/** /**
* Destroy QInfo object * Destroy QInfo object
* * @param qinfo qhandle
* @param qinfo * @param fp destroy callback function, while the qhandle is destoried, invoke the fp
* @return * @param param free callback params
*/ */
void qDestroyQueryInfo(qinfo_t qinfo); void qDestroyQueryInfo(qinfo_t qinfo, void (*fp)(void*), void* param);
/** /**
* the main query execution function, including query on both table and multitables, * the main query execution function, including query on both table and multitables,
...@@ -45,7 +45,7 @@ void qDestroyQueryInfo(qinfo_t qinfo); ...@@ -45,7 +45,7 @@ void qDestroyQueryInfo(qinfo_t qinfo);
* @param qinfo * @param qinfo
* @return * @return
*/ */
void qTableQuery(qinfo_t qinfo); void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param);
/** /**
* Retrieve the produced results information, if current query is not paused or completed, * Retrieve the produced results information, if current query is not paused or completed,
...@@ -80,9 +80,12 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo); ...@@ -80,9 +80,12 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
/** /**
* kill current ongoing query and free query handle automatically * kill current ongoing query and free query handle automatically
* @param qinfo * @param qinfo qhandle
* @param fp destroy callback function, while the qhandle is destoried, invoke the fp
* @param param free callback params
* @return
*/ */
int32_t qKillQuery(qinfo_t qinfo); int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -5981,7 +5981,7 @@ static void doDestoryQueryInfo(SQInfo* pQInfo) { ...@@ -5981,7 +5981,7 @@ static void doDestoryQueryInfo(SQInfo* pQInfo) {
freeQInfo(pQInfo); freeQInfo(pQInfo);
} }
void qDestroyQueryInfo(qinfo_t qHandle) { void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) {
SQInfo* pQInfo = (SQInfo*) qHandle; SQInfo* pQInfo = (SQInfo*) qHandle;
if (!isValidQInfo(pQInfo)) { if (!isValidQInfo(pQInfo)) {
return; return;
...@@ -5992,10 +5992,14 @@ void qDestroyQueryInfo(qinfo_t qHandle) { ...@@ -5992,10 +5992,14 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
if (ref == 0) { if (ref == 0) {
doDestoryQueryInfo(pQInfo); doDestoryQueryInfo(pQInfo);
if (fp != NULL) {
fp(param);
}
} }
} }
void qTableQuery(qinfo_t qinfo) { void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo) {
...@@ -6005,7 +6009,7 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -6005,7 +6009,7 @@ void qTableQuery(qinfo_t qinfo) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qTrace("QInfo:%p it is already killed, abort", pQInfo); qTrace("QInfo:%p it is already killed, abort", pQInfo);
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo, fp, param);
return; return;
} }
...@@ -6021,7 +6025,7 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -6021,7 +6025,7 @@ void qTableQuery(qinfo_t qinfo) {
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo, fp, param);
} }
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
...@@ -6114,7 +6118,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6114,7 +6118,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return code; return code;
} }
int32_t qKillQuery(qinfo_t qinfo) { int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) { if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
...@@ -6122,7 +6126,7 @@ int32_t qKillQuery(qinfo_t qinfo) { ...@@ -6122,7 +6126,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
} }
setQueryKilled(pQInfo); setQueryKilled(pQInfo);
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo, fp, param);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -89,7 +89,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -89,7 +89,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vWarn("QInfo:%p connection %p broken, kill query", killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); vWarn("QInfo:%p connection %p broken, kill query", killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
qKillQuery((qinfo_t) killQueryMsg->qhandle); // this message arrived here by means of the query message, so release the vnode is necessary
qKillQuery((qinfo_t) killQueryMsg->qhandle, vnodeRelease, pVnode);
vnodeRelease(pVnode);
return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code
} }
...@@ -112,8 +115,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -112,8 +115,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
//NOTE: there two refcount, needs to kill twice, todo refactor //NOTE: there two refcount, needs to kill twice, todo refactor
qKillQuery(pQInfo); qKillQuery(pQInfo, vnodeRelease, pVnode);
qKillQuery(pQInfo); qKillQuery(pQInfo, vnodeRelease, pVnode);
return pRsp->code; return pRsp->code;
} }
...@@ -128,7 +131,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -128,7 +131,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pQInfo != NULL) { if (pQInfo != NULL) {
vTrace("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo);
qTableQuery(pQInfo); // do execute query qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query
} }
return code; return code;
...@@ -146,7 +149,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -146,7 +149,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
vTrace("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
int32_t ret = qKillQuery(pQInfo); int32_t ret = qKillQuery(pQInfo, vnodeRelease, pVnode);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp); pRet->len = sizeof(SRetrieveTableRsp);
...@@ -175,8 +178,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -175,8 +178,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRet->qhandle = pQInfo; pRet->qhandle = pQInfo;
code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
} else { // no further execution invoked, release the ref to vnode } else { // no further execution invoked, release the ref to vnode
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo, vnodeRelease, pVnode);
vnodeRelease(pVnode);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册