diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index dbe44586813e27fba988f57efacfbd0cfce4134a..6bc057e5ac6e5b453ef123014fe0620867c7ab45 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -16,23 +16,28 @@ #define _DEFAULT_SOURCE #include "vnd.h" +#define BATCH_DISABLE 1 + static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || - (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL); + (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) || + (type == TDMT_VND_ALTER_REPLICA); } static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { - vTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); tsem_wait(&pVnode->syncSem); } } static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { - vTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); tsem_post(&pVnode->syncSem); } } @@ -124,60 +129,137 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) { tmsgSendRedirectRsp(&rsp, &newEpSet); } +static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { + SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; + if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { + rsp.code = terrno; + const STraceId *trace = &pMsg->info.traceId; + vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr()); + } + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); + } +} + +static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) { + if (code == TSDB_CODE_SYN_NOT_LEADER) { + vnodeRedirectRpcMsg(pVnode, pMsg); + } else { + const STraceId *trace = &pMsg->info.traceId; + vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code); + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); + } + } +} + +static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg); + + if (code > 0) { + ASSERT(0); + } else if (code == 0) { + vnodeWaitBlockMsg(pVnode, pMsg); + } else { + if (terrno != 0) code = terrno; + vnodeHandleProposeError(pVnode, pMsg, code); + } + + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) { + if (*arrSize <= 0) return; + +#if BATCH_DISABLE + int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]); +#else + int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize); +#endif + + if (code > 0) { + for (int32_t i = 0; i < *arrSize; ++i) { + vnodeHandleWriteMsg(pVnode, pMsgArr[i]); + } + } else if (code == 0) { + vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]); + } else { + if (terrno != 0) code = terrno; + for (int32_t i = 0; i < *arrSize; ++i) { + vnodeHandleProposeError(pVnode, pMsgArr[i], code); + } + } + + for (int32_t i = 0; i < *arrSize; ++i) { + SRpcMsg *pMsg = pMsgArr[i]; + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + + *arrSize = 0; +} + void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; - + int32_t arrayPos = 0; + SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg*)); + bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool)); vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs); - for (int32_t m = 0; m < numOfMsgs; m++) { + for (int32_t msg = 0; msg < numOfMsgs; msg++) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + bool isWeak = vnodeIsMsgWeak(pMsg->msgType); + bool isBlock = vnodeIsMsgBlock(pMsg->msgType); + const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); + vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg, + isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle); + + if (pMsgArr == NULL || pIsWeakArr == NULL) { + vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg); + vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_OUT_OF_MEMORY); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + continue; + } code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) { - vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); - } else { - if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { - code = vnodeProcessAlterReplicaReq(pVnode, pMsg); - } else { - code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType)); - if (code > 0) { - SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; - if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { - rsp.code = terrno; - vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr()); - } - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } - } else if (code == 0) { - vnodeWaitBlockMsg(pVnode, pMsg); - } else { - } - } + vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + continue; } - if (code < 0) { - if (terrno == TSDB_CODE_SYN_NOT_LEADER) { - vnodeRedirectRpcMsg(pVnode, pMsg); - } else { - if (terrno != 0) code = terrno; - vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code); - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } - } + if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { + vnodeHandleAlterReplicaReq(pVnode, pMsg); + continue; } - vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); + if (isBlock || BATCH_DISABLE) { + vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos); + } + + pMsgArr[arrayPos] = pMsg; + pIsWeakArr[arrayPos] = isWeak; + arrayPos++; + + if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) { + vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos); + } } + + taosMemoryFree(pMsgArr); + taosMemoryFree(pIsWeakArr); } void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {