diff --git a/src/balance/src/bnMain.c b/src/balance/src/bnMain.c index 7725aa5db4ca9de5b72d8a900a4d9e0b3c0200db..3e1d5eda763c99afb8a0c5dcd3f08996eec80337 100644 --- a/src/balance/src/bnMain.c +++ b/src/balance/src/bnMain.c @@ -237,21 +237,21 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { bool isReady = false; for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SVnodeGid *pVnode = pVgroup->vnodeGid + i; + SDnodeObj *pDnode = pVnode->pDnode; if (pVnode == pRmVnode) continue; int32_t vver = mnodeGetVgidVer(pVnode->vver); - mTrace("vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d, rmvver:%d" , pVgroup->vgId, i, - pVnode->dnodeId, dnodeStatus[pVnode->pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer); - if (pVnode->pDnode->status == TAOS_DN_STATUS_DROPPING) continue; - if (pVnode->pDnode->status == TAOS_DN_STATUS_OFFLINE) continue; + mTrace("vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d, rmvver:%d", pVgroup->vgId, i, + pVnode->dnodeId, dnodeStatus[pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer); + if (pDnode->status == TAOS_DN_STATUS_DROPPING) continue; + if (pDnode->status == TAOS_DN_STATUS_OFFLINE) continue; if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) continue; if (rmVnodeVer == 0 || vver >= rmVnodeVer) { - mInfo("vgId:%d, is ready for vindex:%d in dnode:%d status:%s role:%s vver:%d larger than rmvver:%d", pVgroup->vgId, i, - pVnode->dnodeId, dnodeStatus[pVnode->pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer); + mInfo("vgId:%d, is ready for vindex:%d in dnode:%d status:%s role:%s vver:%d larger than rmvver:%d", + pVgroup->vgId, i, pVnode->dnodeId, dnodeStatus[pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer); + isReady = true; } - - isReady = true; } return isReady; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index a3a88e8b7b143684f047ac395b91e830c2884fe8..571c7d667adfd285387ab0c11a308dc7472bb0af 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -101,8 +101,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara return syncCode; } -static int32_t vnodeCheckWrite(void *vparam) { - SVnodeObj *pVnode = vparam; +static int32_t vnodeCheckWrite(SVnodeObj *pVnode) { if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); return TSDB_CODE_VND_NO_WRITE_AUTH; @@ -216,29 +215,21 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR return TSDB_CODE_SUCCESS; } -int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { - SVnodeObj *pVnode = vparam; - SWalHead * pHead = wparam; - int32_t code = 0; - - if (qtype == TAOS_QTYPE_RPC) { - code = vnodeCheckWrite(pVnode); - if (code != TSDB_CODE_SUCCESS) return code; - } - +static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32_t qtype, SRpcMsg *pRpcMsg) { if (pHead->len > TSDB_MAX_WAL_SIZE) { vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version); - return TSDB_CODE_WAL_SIZE_LIMIT; + terrno = TSDB_CODE_WAL_SIZE_LIMIT; + return NULL; } int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len; SVWriteMsg *pWrite = taosAllocateQitem(size); if (pWrite == NULL) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + return NULL; } - if (rparam != NULL) { - SRpcMsg *pRpcMsg = rparam; + if (pRpcMsg != NULL) { pWrite->rpcMsg = *pRpcMsg; } @@ -248,6 +239,21 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar atomic_add_fetch_32(&pVnode->refCount, 1); + return pWrite; +} + +static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { + SVnodeObj *pVnode = pWrite->pVnode; + + if (pWrite->qtype == TAOS_QTYPE_RPC) { + int32_t code = vnodeCheckWrite(pVnode); + if (code != TSDB_CODE_SUCCESS) { + taosFreeQitem(pWrite); + vnodeRelease(pVnode); + return code; + } + } + int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); if (queued > MAX_QUEUED_MSG_NUM) { int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3; @@ -256,15 +262,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar taosMsleep(ms); } - code = vnodePerformFlowCtrl(pWrite); - if (code != 0) return 0; - vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); - taosWriteQitem(pVnode->wqueue, qtype, pWrite); + taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); return TSDB_CODE_SUCCESS; } +int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { + SVWriteMsg *pWrite = vnodeBuildVWriteMsg(vparam, wparam, qtype, rparam); + if (pWrite == NULL) { + assert(terrno != 0); + return terrno; + } + + int32_t code = vnodePerformFlowCtrl(pWrite); + if (code != 0) return 0; + + return vnodeWriteToWQueueImp(pWrite); +} + void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { SVnodeObj *pVnode = vparam; @@ -294,7 +310,10 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { vDebug("vgId:%d, msg:%p, write into vwqueue after flowctrl, retry:%d", pVnode->vgId, pWrite, pWrite->processedCount); pWrite->processedCount = 0; - taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); + code = vnodeWriteToWQueueImp(pWrite); + if (code != 0) { + dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); + } } } } @@ -318,4 +337,4 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { pWrite->processedCount); return TSDB_CODE_VND_ACTION_IN_PROGRESS; } -} +} \ No newline at end of file