提交 733440b3 编写于 作者: M Minghao Li

refactor(sync): add double queues in vnode

上级 b3b0e5a5
...@@ -34,6 +34,7 @@ typedef enum { ...@@ -34,6 +34,7 @@ typedef enum {
WRITE_QUEUE, WRITE_QUEUE,
APPLY_QUEUE, APPLY_QUEUE,
SYNC_QUEUE, SYNC_QUEUE,
SYNC_CTRL_QUEUE,
STREAM_QUEUE, STREAM_QUEUE,
QUEUE_MAX, QUEUE_MAX,
} EQueueType; } EQueueType;
......
...@@ -207,6 +207,7 @@ typedef struct SSyncInfo { ...@@ -207,6 +207,7 @@ typedef struct SSyncInfo {
SMsgCb* msgcb; SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
int32_t syncInit(); int32_t syncInit();
......
...@@ -695,6 +695,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -695,6 +695,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
// ----------------------------------------- // -----------------------------------------
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
......
...@@ -34,6 +34,7 @@ typedef struct SVnodeMgmt { ...@@ -34,6 +34,7 @@ typedef struct SVnodeMgmt {
SQWorkerPool streamPool; SQWorkerPool streamPool;
SWWorkerPool fetchPool; SWWorkerPool fetchPool;
SWWorkerPool syncPool; SWWorkerPool syncPool;
SWWorkerPool syncCtrlPool;
SWWorkerPool writePool; SWWorkerPool writePool;
SWWorkerPool applyPool; SWWorkerPool applyPool;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
...@@ -60,6 +61,7 @@ typedef struct { ...@@ -60,6 +61,7 @@ typedef struct {
SVnode *pImpl; SVnode *pImpl;
STaosQueue *pWriteQ; STaosQueue *pWriteQ;
STaosQueue *pSyncQ; STaosQueue *pSyncQ;
STaosQueue *pSyncCtrlQ;
STaosQueue *pApplyQ; STaosQueue *pApplyQ;
STaosQueue *pQueryQ; STaosQueue *pQueryQ;
STaosQueue *pStreamQ; STaosQueue *pStreamQ;
...@@ -106,6 +108,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); ...@@ -106,6 +108,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
...@@ -423,6 +423,8 @@ SArray *vmGetMsgHandles() { ...@@ -423,6 +423,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
code = 0; code = 0;
......
...@@ -136,6 +136,22 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf ...@@ -136,6 +136,22 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
} }
} }
static void vmProcessSyncCtrlQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessSyncCtrlMsg(pVnode->pImpl, pMsg, NULL); // no response here
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
SMsgHead *pHead = pMsg->pCont; SMsgHead *pHead = pMsg->pCont;
...@@ -217,6 +233,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp ...@@ -217,6 +233,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
return code; return code;
} }
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE); }
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
...@@ -301,6 +319,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { ...@@ -301,6 +319,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg); pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncCtrlQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
...@@ -325,6 +344,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -325,6 +344,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
tWWorkerFreeQueue(&pMgmt->syncCtrlPool, pVnode->pSyncCtrlQ);
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
...@@ -370,6 +390,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { ...@@ -370,6 +390,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pSPool->max = tsNumOfVnodeSyncThreads; pSPool->max = tsNumOfVnodeSyncThreads;
if (tWWorkerInit(pSPool) != 0) return -1; if (tWWorkerInit(pSPool) != 0) return -1;
SWWorkerPool *pSCPool = &pMgmt->syncCtrlPool;
pSCPool->name = "vnode-sync-ctrl";
pSCPool->max = tsNumOfVnodeSyncThreads;
if (tWWorkerInit(pSCPool) != 0) return -1;
SSingleWorkerCfg mgmtCfg = { SSingleWorkerCfg mgmtCfg = {
.min = 1, .min = 1,
.max = 1, .max = 1,
...@@ -398,6 +423,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { ...@@ -398,6 +423,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->applyPool); tWWorkerCleanup(&pMgmt->applyPool);
tWWorkerCleanup(&pMgmt->syncPool); tWWorkerCleanup(&pMgmt->syncPool);
tWWorkerCleanup(&pMgmt->syncCtrlPool);
tQWorkerCleanup(&pMgmt->queryPool); tQWorkerCleanup(&pMgmt->queryPool);
tQWorkerCleanup(&pMgmt->streamPool); tQWorkerCleanup(&pMgmt->streamPool);
tWWorkerCleanup(&pMgmt->fetchPool); tWWorkerCleanup(&pMgmt->fetchPool);
......
...@@ -82,6 +82,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); ...@@ -82,6 +82,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
......
...@@ -323,6 +323,49 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -323,6 +323,49 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
} }
} }
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId;
if (!syncEnvIsStart()) {
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId);
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
if (pSyncNode == NULL) {
vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
syncHeartbeatDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
syncHeartbeatReplyDestroy(pSyncMsg);
} else {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
code = -1;
}
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
code);
syncNodeRelease(pSyncNode);
if (code != 0 && terrno == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return code;
}
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = 0; int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
...@@ -473,6 +516,19 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -473,6 +516,19 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return code; return code;
} }
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (msgcb == NULL) {
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (msgcb == NULL) { if (msgcb == NULL) {
return -1; return -1;
...@@ -758,6 +814,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -758,6 +814,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
.msgcb = NULL, .msgcb = NULL,
.FpSendMsg = vnodeSyncSendMsg, .FpSendMsg = vnodeSyncSendMsg,
.FpEqMsg = vnodeSyncEqMsg, .FpEqMsg = vnodeSyncEqMsg,
.FpEqCtrlMsg = vnodeSyncEqCtrlMsg,
}; };
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
......
...@@ -73,6 +73,7 @@ typedef struct SSyncNode { ...@@ -73,6 +73,7 @@ typedef struct SSyncNode {
const SMsgCb* msgcb; const SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
// init internal // init internal
SNodeInfo myNodeInfo; SNodeInfo myNodeInfo;
...@@ -287,6 +288,12 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries ...@@ -287,6 +288,12 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
// for debug -------------- // for debug --------------
void syncNodePrint(SSyncNode* pObj); void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj);
......
...@@ -974,6 +974,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -974,6 +974,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
pSyncNode->msgcb = pSyncInfo->msgcb; pSyncNode->msgcb = pSyncInfo->msgcb;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg;
// init raft config // init raft config
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
...@@ -2698,6 +2699,45 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { ...@@ -2698,6 +2699,45 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
return ret; return ret;
} }
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
syncLogRecvHeartbeat(ths, pMsg, "");
SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
pMsgReply->destId = pMsg->srcId;
pMsgReply->srcId = ths->myRaftId;
pMsgReply->term = ths->pRaftStore->currentTerm;
pMsgReply->privateTerm = 8864; // magic number
SRpcMsg rpcMsg;
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
if (pMsg->term == ths->pRaftStore->currentTerm) {
sInfo("vgId:%d, heartbeat reset timer", 1);
syncNodeResetElectTimer(ths);
}
/*
// htonl
SMsgHead* pHead = rpcMsg.pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
*/
// reply
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
return 0;
}
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
syncLogRecvHeartbeatReply(ths, pMsg, "");
// update last reply time, make decision whether the other node is alive or not
syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);
return 0;
}
// TLA+ Spec // TLA+ Spec
// ClientRequest(i, v) == // ClientRequest(i, v) ==
// /\ state[i] = Leader // /\ state[i] = Leader
...@@ -3304,3 +3344,45 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries ...@@ -3304,3 +3344,45 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port,
pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port,
pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "send sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册