提交 c1498e4a 编写于 作者: S Shengliang Guan

fix: invalid vnode ref while drop stream

上级 afc316ef
...@@ -412,8 +412,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -412,8 +412,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2); tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
#if 0
tsNumOfVnodeFetchThreads = tsNumOfCores / 4; tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
#else
tsNumOfVnodeFetchThreads = 1;
#endif
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeWriteThreads = tsNumOfCores; tsNumOfVnodeWriteThreads = tsNumOfCores;
......
...@@ -31,7 +31,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { ...@@ -31,7 +31,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
if (pVnode && num < size) { if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); // dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount);
pVnodes[num++] = (*ppVnode); pVnodes[num++] = (*ppVnode);
pIter = taosHashIterate(pMgmt->hash, pIter); pIter = taosHashIterate(pMgmt->hash, pIter);
} else { } else {
......
...@@ -23,6 +23,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { ...@@ -23,6 +23,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL || pVnode->dropped) { if (pVnode == NULL || pVnode->dropped) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
pVnode = NULL;
} else { } else {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount); // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
...@@ -80,6 +81,8 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -80,6 +81,8 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
taosThreadRwlockUnlock(&pMgmt->lock); taosThreadRwlockUnlock(&pMgmt->lock);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
dTrace("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
while (pVnode->refCount > 0) taosMsleep(10); while (pVnode->refCount > 0) taosMsleep(10);
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId); dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);
......
...@@ -206,9 +206,9 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { ...@@ -206,9 +206,9 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype); int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype);
if (code != 0) { if (code != 0) {
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pRpc->pCont = NULL; pRpc->pCont = NULL;
taosFreeQitem(pMsg);
} }
return code; return code;
...@@ -237,8 +237,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { ...@@ -237,8 +237,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
default: default:
break; break;
} }
}
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
}
return size; return size;
} }
...@@ -255,7 +255,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -255,7 +255,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
return -1; return -1;
} }
dDebug("vgId:%d, queue is alloced", pVnode->vgId); dDebug("vgId:%d, write-queue:%p is alloced", pVnode->vgId, pVnode->pWriteQ);
dDebug("vgId:%d, sync-queue:%p is alloced", pVnode->vgId, pVnode->pSyncQ);
dDebug("vgId:%d, apply-queue:%p is alloced", pVnode->vgId, pVnode->pApplyQ);
dDebug("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
dDebug("vgId:%d, fetch-queue:%p is alloced", pVnode->vgId, pVnode->pFetchQ);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册