diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index adc18fdced4777fcfdc731993ce758e301c34ac9..0f7daf0e1d8e0fab76f4715261624e5953ea3a31 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -52,10 +52,9 @@ typedef struct { typedef struct { int32_t vgId; - int32_t refCount; int32_t vgVersion; + int32_t refCount; int8_t dropped; - int8_t accessState; char *path; SVnode *pImpl; STaosQueue *pWriteQ; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4c5a32536f13440e9abfaec0bf79eee6301aa761..0ae6c2b3360553e33f000b245568d833a53faf75 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -323,7 +323,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; - // Requests handled by VNODE if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 23927255bb8c920bfaf9de12e91bbffe4bd1de39..9e4e7713f2b1f9e63129f02279ed3742b1b55937 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -49,10 +49,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { } pVnode->vgId = pCfg->vgId; - pVnode->refCount = 0; pVnode->vgVersion = pCfg->vgVersion; + pVnode->refCount = 0; pVnode->dropped = 0; - pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->path = tstrdup(pCfg->path); pVnode->pImpl = pImpl; @@ -96,7 +95,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { dDebug("vgId:%d, vnode is closed", pVnode->vgId); if (pVnode->dropped) { - dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped); + dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId); vnodeDestroy(path, pMgmt->pTfs); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 427b3d5c94e49883f7349c133d6d8bfc6111a59a..b6913f93f271dc9ba30de6886bc2e405114f1267 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -53,9 +53,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { } if (IsReq(pMsg)) { - if (code != 0 && terrno != 0) { + if (code != 0) { + if (terrno != 0) code = terrno; dError("msg:%p failed to process since %s", pMsg, terrstr()); - code = terrno; } vmSendRsp(pMsg, code); } @@ -97,110 +97,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - int32_t code = 0; - SRpcMsg *pMsg = NULL; - SVnodeObj *pVnode = pInfo->ahandle; - int64_t sync = vnodeGetSyncHandle(pVnode->pImpl); - SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg **)); - - for (int32_t m = 0; m < numOfMsgs; m++) { - if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - dTrace("vgId:%d, msg:%p get from vnode-write queue", pVnode->vgId, pMsg); - - if (taosArrayPush(pArray, &pMsg) == NULL) { - dError("vgId:%d, failed to push msg:%p to vnode-write array", pVnode->vgId, pMsg); - vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); - } - } - - for (int32_t m = 0; m < taosArrayGetSize(pArray); m++) { - pMsg = *(SRpcMsg **)taosArrayGet(pArray, m); - code = vnodePreprocessReq(pVnode->pImpl, pMsg); - - if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - dTrace("vgId:%d, msg:%p in progress and no rsp", pVnode->vgId, pMsg); - continue; - } - - if (pMsg->msgType != TDMT_VND_ALTER_REPLICA) { - code = syncPropose(sync, pMsg, false); - } - - if (code == TAOS_SYNC_PROPOSE_SUCCESS) { - dTrace("vgId:%d, msg:%p is proposed and no rsp", pVnode->vgId, pMsg); - continue; - } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { - SEpSet newEpSet = {0}; - syncGetEpSet(sync, &newEpSet); - SEp *pEp = &newEpSet.eps[newEpSet.inUse]; - if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) { - newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; - } - - dTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->vgId, pMsg, - newEpSet.numOfEps, newEpSet.inUse); - for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { - dTrace("vgId:%d, msg:%p ep:%s:%u", pVnode->vgId, pMsg, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); - } - - SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; - tmsgSendRedirectRsp(&rsp, &newEpSet); - } else { - dError("vgId:%d, msg:%p failed to propose write since %s, code:0x%x", pVnode->vgId, pMsg, tstrerror(code), code); - vmSendRsp(pMsg, code); - } - } - - for (int32_t i = 0; i < numOfMsgs; i++) { - pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - dTrace("vgId:%d, msg:%p is freed", pVnode->vgId, pMsg); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } - - taosArrayDestroy(pArray); -} - -static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; - - for (int32_t i = 0; i < numOfMsgs; ++i) { - if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - dTrace("vgId:%d, msg:%p get from vnode-apply queue", pVnode->vgId, pMsg); - - // init response rpc msg - SRpcMsg rsp = {0}; - - // get original rpc msg - assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG); - SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg); - syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg); - SRpcMsg originalRpcMsg; - syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg); - - // apply data into tsdb - if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) { - rsp.code = terrno; - dError("vgId:%d, msg:%p failed to apply since %s", pVnode->vgId, pMsg, terrstr()); - } - - syncApplyMsgDestroy(pSyncApplyMsg); - rpcFreeCont(originalRpcMsg.pCont); - - // if leader, send response - if (pMsg->info.handle != NULL) { - rsp.info = pMsg->info; - tmsgSendRsp(&rsp); - } - - dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, rsp.code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } -} - static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; SRpcMsg *pMsg = NULL; @@ -322,7 +218,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { if (pMsg == NULL) return -1; SMsgHead *pHead = pRpc->pCont; - dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pMsg->msgType)); + dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType)); pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); @@ -362,9 +258,9 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { } int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { - pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); + pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); - pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeApplyMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue); @@ -381,8 +277,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); - tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 8c7a78a5afb7b7620a1dc2f64ae8cc70b042bff6..01e57d5eafe8a51ac60f7b10033ffa1df38c28f8 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -71,6 +71,9 @@ int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader); int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); +void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); +void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); + // meta typedef struct SMeta SMeta; // todo: remove typedef struct SMetaReader SMetaReader; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 5f4f7e70daf089d22fa4e80978e787d10dc08c09..b0599b82ef3f7a5d62d43190ff4d09ebe428e001 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -84,7 +84,6 @@ int32_t vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeSyncOpen(SVnode* pVnode, char* path); void vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); -int32_t vnodeSyncAlter(SVnode* pVnode, SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 77005767fe2517ce3549f47cf28490ecd61d7c2a..377dcf1b6a96a17ebb08e5e2ec474d96f5c7ad21 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -232,8 +232,10 @@ struct SVnode { SWal* pWal; STQ* pTq; SSink* pSink; - int64_t sync; tsem_t canCommit; + int64_t sync; + int32_t syncCount; + sem_t syncSem; SQHandle* pQuery; }; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index a85b8306165326bc07f643718e9b67201e668de6..dfc258b42b75bb3b5da4012a4f44ed0c7d337f05 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -81,7 +81,9 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.applied = info.state.committed; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; + pVnode->syncCount = 0; + tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); // open buffer pool @@ -175,6 +177,7 @@ void vnodeClose(SVnode *pVnode) { vnodeCloseBufPool(pVnode); // destroy handle tsem_destroy(&(pVnode->canCommit)); + tsem_destroy(&pVnode->syncSem); taosMemoryFree(pVnode); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7a9c9ef393a34da7317daf3f32cfef042288e446..1524d8fe6fc246578c9732a8caceed0d823e6070 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -91,9 +91,6 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { } } break; - case TDMT_VND_ALTER_REPLICA: { - code = vnodeSyncAlter(pVnode, pMsg); - } break; default: break; } @@ -107,7 +104,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp int32_t len; int32_t ret; - vTrace("vgId:%d, start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), + vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); pVnode->state.applied = version; @@ -173,7 +170,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; } - vTrace("vgId:%d, process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); + vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 8f98e9cec57b810adcbacb989b6179ac7fc9b2ad..efff466c187428bb56cdfb6711073204d96c2af1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -16,41 +16,42 @@ #define _DEFAULT_SOURCE #include "vnd.h" -static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg); -static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg); -static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode); -static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); -static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); -static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); -static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); +static inline bool vnodeIsMsgBlock(tmsg_t type) { + return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_ALTER_REPLICA); +} -int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { - SSyncInfo syncInfo = { - .vgId = pVnode->config.vgId, - .isStandBy = pVnode->config.standby, - .syncCfg = pVnode->config.syncCfg, - .pWal = pVnode->pWal, - .msgcb = NULL, - .FpSendMsg = vnodeSyncSendMsg, - .FpEqMsg = vnodeSyncEqMsg, - }; +static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); - syncInfo.pFsm = vnodeSyncMakeFsm(pVnode); +static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) { + if (!vnodeIsMsgBlock(type)) return; - pVnode->sync = syncOpen(&syncInfo); - if (pVnode->sync <= 0) { - vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr()); - return -1; - } + int32_t count = atomic_add_fetch_32(&pVnode->syncCount, 1); + vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type)); +} - setPingTimerMS(pVnode->sync, 3000); - setElectTimerMS(pVnode->sync, 500); - setHeartbeatTimerMS(pVnode->sync, 100); - return 0; +static inline void vnodeWaitBlockMsg(SVnode *pVnode) { + int32_t count = atomic_load_32(&pVnode->syncCount); + if (count <= 0) return; + + vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count); + tsem_wait(&pVnode->syncSem); + vTrace("vgId:%d, ===> block finish, count:%d", pVnode->config.vgId, count); } -int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) { +static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) { + if (!vnodeIsMsgBlock(type)) return; + + int32_t count = atomic_load_32(&pVnode->syncCount); + if (count <= 0) return; + + count = atomic_sub_fetch_32(&pVnode->syncCount, 1); + vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type)); + if (count <= 0) { + tsem_post(&pVnode->syncSem); + } +} + +static int32_t vnodeProcessSyncReconfigReq(SVnode *pVnode, SRpcMsg *pMsg) { SAlterVnodeReq req = {0}; if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -66,29 +67,107 @@ int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) { vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort); } - int32_t code = syncReconfig(pVnode->sync, &cfg); - if (code == TAOS_SYNC_PROPOSE_SUCCESS) { - // todo refactor - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - tmsgSendRsp(&rsp); - return TSDB_CODE_ACTION_IN_PROGRESS; + return syncReconfig(pVnode->sync, &cfg); +} + +void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnode *pVnode = pInfo->ahandle; + int32_t vgId = pVnode->config.vgId; + int32_t code = 0; + SRpcMsg *pMsg = NULL; + + for (int32_t m = 0; m < numOfMsgs; m++) { + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + vTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); + + if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { + code = vnodeProcessSyncReconfigReq(pVnode, pMsg); + } else { + code = vnodePreprocessReq(pVnode, pMsg); + if (code != 0) { + vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr()); + } else { + code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType)); + } + } + + if (code == 0) { + vnodeAccumBlockMsg(pVnode, pMsg->msgType); + } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + SEpSet newEpSet = {0}; + syncGetEpSet(pVnode->sync, &newEpSet); + SEp *pEp = &newEpSet.eps[newEpSet.inUse]; + if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) { + newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; + } + + vTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps, + newEpSet.inUse); + for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { + vTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); + } + + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; + tmsgSendRedirectRsp(&rsp, &newEpSet); + } else { + if (terrno != 0) code = terrno; + vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code); + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); + } + + vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } - return code; + vnodeWaitBlockMsg(pVnode); } -void vnodeSyncStart(SVnode *pVnode) { - syncSetMsgCb(pVnode->sync, &pVnode->msgCb); - if (pVnode->config.standby) { - syncStartStandBy(pVnode->sync); - } else { - syncStart(pVnode->sync); +void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnode *pVnode = pInfo->ahandle; + int32_t vgId = pVnode->config.vgId; + int32_t code = 0; + SRpcMsg *pMsg = NULL; + + for (int32_t i = 0; i < numOfMsgs; ++i) { + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + vTrace("vgId:%d, msg:%p get from vnode-apply queue", vgId, pMsg); + + // init response rpc msg + SRpcMsg rsp = {0}; + + // get original rpc msg + assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG); + SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg); + syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg); + SRpcMsg originalRpcMsg; + syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg); + + // apply data into tsdb + if (vnodeProcessWriteReq(pVnode, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) { + rsp.code = terrno; + vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr()); + } + + syncApplyMsgDestroy(pSyncApplyMsg); + rpcFreeCont(originalRpcMsg.pCont); + + vnodePostBlockMsg(pVnode, originalRpcMsg.msgType); + + // if leader, send response + if (pMsg->info.handle != NULL) { + rsp.info = pMsg->info; + tmsgSendRsp(&rsp); + } + + vTrace("vgId:%d, msg:%p is freed, code:0x%x handle:%p", vgId, pMsg, rsp.code, pMsg->info.handle); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } } -void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } - -int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { +static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); @@ -97,7 +176,7 @@ int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return code; } -int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { +static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t code = tmsgSendReq(pEpSet, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); @@ -106,19 +185,19 @@ int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { +static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { vnodeGetSnapshot(pFsm->data, pSnapshot); return 0; } -void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { +static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { SVnode *pVnode = pFsm->data; vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode)); // todo rpc response here } -void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { SSnapshot snapshot = {0}; @@ -164,7 +243,7 @@ void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) } } -void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, @@ -172,14 +251,14 @@ void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMet syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } -void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } -SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { +static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pVnode; pFsm->FpCommitCb = vnodeSyncCommitMsg; @@ -188,6 +267,42 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; pFsm->FpRestoreFinishCb = NULL; pFsm->FpReConfigCb = vnodeSyncReconfig; - return pFsm; -} \ No newline at end of file +} + +int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { + SSyncInfo syncInfo = { + .vgId = pVnode->config.vgId, + .isStandBy = pVnode->config.standby, + .syncCfg = pVnode->config.syncCfg, + .pWal = pVnode->pWal, + .msgcb = NULL, + .FpSendMsg = vnodeSyncSendMsg, + .FpEqMsg = vnodeSyncEqMsg, + }; + + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); + syncInfo.pFsm = vnodeSyncMakeFsm(pVnode); + + pVnode->sync = syncOpen(&syncInfo); + if (pVnode->sync <= 0) { + vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr()); + return -1; + } + + setPingTimerMS(pVnode->sync, 3000); + setElectTimerMS(pVnode->sync, 500); + setHeartbeatTimerMS(pVnode->sync, 100); + return 0; +} + +void vnodeSyncStart(SVnode *pVnode) { + syncSetMsgCb(pVnode->sync, &pVnode->msgCb); + if (pVnode->config.standby) { + syncStartStandBy(pVnode->sync); + } else { + syncStart(pVnode->sync); + } +} + +void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }