提交 bc8e6b7f 编写于 作者: S Shengliang Guan

refactor: adjust vnode sync

上级 ae9e11bb
...@@ -33,7 +33,7 @@ SSyncNode* syncNodeAcquire(int64_t rid); ...@@ -33,7 +33,7 @@ SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode); void syncNodeRelease(SSyncNode* pNode);
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo);
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);
char* sync2SimpleStr(int64_t rid); char* sync2SimpleStr(int64_t rid);
......
...@@ -33,7 +33,10 @@ extern int32_t tsRpcHeadSize; ...@@ -33,7 +33,10 @@ extern int32_t tsRpcHeadSize;
typedef struct { typedef struct {
uint32_t clientIp; uint32_t clientIp;
uint16_t clientPort; uint16_t clientPort;
char user[TSDB_USER_LEN]; union {
char user[TSDB_USER_LEN];
int64_t applyIndex;
};
} SRpcConnInfo; } SRpcConnInfo;
typedef struct SRpcHandleInfo { typedef struct SRpcHandleInfo {
......
...@@ -35,7 +35,6 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode) { ...@@ -35,7 +35,6 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count); vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
tsem_wait(&pVnode->syncSem); tsem_wait(&pVnode->syncSem);
vTrace("vgId:%d, ===> block finish, count:%d", pVnode->config.vgId, count);
} }
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) { static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
...@@ -93,6 +92,11 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -93,6 +92,11 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
if (code == 0) { if (code == 0) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType); vnodeAccumBlockMsg(pVnode, pMsg->msgType);
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
// todo refactor
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
}
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
SEpSet newEpSet = {0}; SEpSet newEpSet = {0};
syncGetEpSet(pVnode->sync, &newEpSet); syncGetEpSet(pVnode->sync, &newEpSet);
...@@ -132,36 +136,22 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -132,36 +136,22 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
vTrace("vgId:%d, msg:%p get from vnode-apply queue", vgId, pMsg); vTrace("vgId:%d, msg:%p get from vnode-apply queue, handle:%p", vgId, pMsg, pMsg->info.handle);
// init response rpc msg
SRpcMsg rsp = {0};
// get original rpc msg
assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG);
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg);
syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg);
SRpcMsg originalRpcMsg;
syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg);
// apply data into tsdb
if (vnodeProcessWriteReq(pVnode, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) {
rsp.code = terrno;
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
}
syncApplyMsgDestroy(pSyncApplyMsg);
rpcFreeCont(originalRpcMsg.pCont);
vnodePostBlockMsg(pVnode, originalRpcMsg.msgType); SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) {
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) {
rsp.code = terrno;
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
}
}
// if leader, send response vnodePostBlockMsg(pVnode, pMsg->msgType);
if (pMsg->info.handle != NULL) { if (rsp.info.handle != NULL) {
rsp.info = pMsg->info;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
vTrace("vgId:%d, msg:%p is freed, code:0x%x handle:%p", vgId, pMsg, rsp.code, pMsg->info.handle); vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
...@@ -195,43 +185,33 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigC ...@@ -195,43 +185,33 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigC
vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode)); vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode));
// todo rpc response here // todo rpc response here
// build rpc msg
// put into apply queue
} }
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
SSnapshot snapshot = {0};
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
char logBuf[256] = {0};
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshot != NULL) {
SSnapshot snapshot = {0}; (*pFsm->FpGetSnapshot)(pFsm, &snapshot);
pFsm->FpGetSnapshot(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
if (cbMeta.index > beginIndex) { if (cbMeta.index > beginIndex) {
char logBuf[256] = {0};
snprintf( snprintf(
logBuf, sizeof(logBuf), logBuf, sizeof(logBuf),
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SVnode *pVnode = pFsm->data; SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
SRpcMsg applyMsg; memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg); syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
syncApplyMsgDestroy(pSyncApplyMsg); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
// recover handle for response
SRpcMsg saveRpcMsg;
int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg);
if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) {
applyMsg.info = saveRpcMsg.info;
} else {
applyMsg.info.handle = NULL;
applyMsg.info.ahandle = NULL;
}
// put to applyQ
tmsgPutToQueue(&(pVnode->msgCb), APPLY_QUEUE, &applyMsg);
} else { } else {
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
......
...@@ -281,7 +281,7 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { ...@@ -281,7 +281,7 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
return ret; return ret;
} }
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR; return TAOS_SYNC_STATE_ERROR;
...@@ -291,7 +291,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { ...@@ -291,7 +291,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
SRespStub stub; SRespStub stub;
int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
if (ret == 1) { if (ret == 1) {
memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); *pInfo = stub.rpcMsg.info;
} }
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册