提交 3e5e8215 编写于 作者: S Shengliang Guan

refactor: add lock vnode hash

上级 e8d0bcef
...@@ -32,8 +32,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { ...@@ -32,8 +32,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
if (pVnode && num < size) { if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); // dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
pVnodes[num] = (*ppVnode); pVnodes[num++] = (*ppVnode);
num++;
pIter = taosHashIterate(pMgmt->hash, pIter); pIter = taosHashIterate(pMgmt->hash, pIter);
} else { } else {
taosHashCancelIterate(pMgmt->hash, pIter); taosHashCancelIterate(pMgmt->hash, pIter);
......
...@@ -88,7 +88,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -88,7 +88,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dTrace("vgId:%d, vnode-fetch queue is empty", pVnode->vgId); dTrace("vgId:%d, vnode queue is empty", pVnode->vgId);
vmFreeQueue(pMgmt, pVnode); vmFreeQueue(pMgmt, pVnode);
vnodeClose(pVnode->pImpl); vnodeClose(pVnode->pImpl);
...@@ -140,7 +140,7 @@ static void *vmOpenVnodeInThread(void *param) { ...@@ -140,7 +140,7 @@ static void *vmOpenVnodeInThread(void *param) {
} }
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pMgmt->hash == NULL) { if (pMgmt->hash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
dError("failed to init vnode hash since %s", terrstr()); dError("failed to init vnode hash since %s", terrstr());
...@@ -156,7 +156,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { ...@@ -156,7 +156,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
pMgmt->state.totalVnodes = numOfVnodes; pMgmt->state.totalVnodes = numOfVnodes;
int32_t threadNum = 1; int32_t threadNum = tsNumOfCores / 2;
if (threadNum < 1) threadNum = 0;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1; int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
......
...@@ -185,7 +185,11 @@ int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -185,7 +185,11 @@ int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1; if (pMsg == NULL) {
rpcFreeCont(pMsg->pCont);
pRpc->pCont = NULL;
return -1;
}
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType)); dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType));
...@@ -193,7 +197,16 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { ...@@ -193,7 +197,16 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
return vmPutMsgToQueue(pMgmt, pMsg, qtype);
int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype);
if (code != 0) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pMsg->pCont);
pRpc->pCont = NULL;
}
return code;
} }
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
......
...@@ -71,9 +71,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { ...@@ -71,9 +71,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
} }
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans * pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1; int32_t code = -1;
SRpcMsg * pMsg = NULL; SRpcMsg *pMsg = NULL;
SMgmtWrapper *pWrapper = NULL; SMgmtWrapper *pWrapper = NULL;
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
...@@ -185,6 +185,7 @@ _OVER: ...@@ -185,6 +185,7 @@ _OVER:
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
} }
dmReleaseWrapper(pWrapper); dmReleaseWrapper(pWrapper);
...@@ -195,11 +196,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { ...@@ -195,11 +196,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SArray * pArray = (*pWrapper->func.getHandlesFp)(); SArray *pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1; if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle * pMgmt = taosArrayGet(pArray, i); SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) { if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId; pHandle->needCheckVgId = pMgmt->needCheckVgId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册