提交 4c9f10b6 编写于 作者: M Minghao Li

refactor(sync): add sync ctrl queue in vnode

上级 19458755
......@@ -34,6 +34,7 @@ typedef enum {
WRITE_QUEUE,
APPLY_QUEUE,
SYNC_QUEUE,
SYNC_CTRL_QUEUE,
STREAM_QUEUE,
QUEUE_MAX,
} EQueueType;
......
......@@ -695,6 +695,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
// -----------------------------------------
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
......
......@@ -34,6 +34,7 @@ typedef struct SVnodeMgmt {
SQWorkerPool streamPool;
SWWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool syncCtrlPool;
SWWorkerPool writePool;
SWWorkerPool applyPool;
SSingleWorker mgmtWorker;
......@@ -60,6 +61,7 @@ typedef struct {
SVnode *pImpl;
STaosQueue *pWriteQ;
STaosQueue *pSyncQ;
STaosQueue *pSyncCtrlQ;
STaosQueue *pApplyQ;
STaosQueue *pQueryQ;
STaosQueue *pStreamQ;
......@@ -106,6 +108,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
......@@ -422,6 +422,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
code = 0;
......
......@@ -136,6 +136,22 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
}
}
static void vmProcessSyncCtrlQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessSyncCtrlMsg(pVnode->pImpl, pMsg, NULL); // no response here
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
const STraceId *trace = &pMsg->info.traceId;
SMsgHead *pHead = pMsg->pCont;
......@@ -203,6 +219,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncQ, pMsg);
break;
case SYNC_CTRL_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncCtrlQ, pMsg);
break;
case APPLY_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pApplyQ, pMsg);
......@@ -219,6 +239,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE);
}
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); }
......@@ -301,6 +325,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncCtrlQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
......@@ -325,6 +350,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
tWWorkerFreeQueue(&pMgmt->syncCtrlPool, pVnode->pSyncCtrlQ);
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
......@@ -370,6 +396,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pSPool->max = tsNumOfVnodeSyncThreads;
if (tWWorkerInit(pSPool) != 0) return -1;
SWWorkerPool *pSCPool = &pMgmt->syncCtrlPool;
pSCPool->name = "vnode-sync-ctrl";
pSCPool->max = tsNumOfVnodeSyncThreads;
if (tWWorkerInit(pSCPool) != 0) return -1;
SSingleWorkerCfg mgmtCfg = {
.min = 1,
.max = 1,
......@@ -398,6 +429,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->applyPool);
tWWorkerCleanup(&pMgmt->syncPool);
tWWorkerCleanup(&pMgmt->syncCtrlPool);
tQWorkerCleanup(&pMgmt->queryPool);
tQWorkerCleanup(&pMgmt->streamPool);
tWWorkerCleanup(&pMgmt->fetchPool);
......
......@@ -82,6 +82,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
......
......@@ -323,6 +323,49 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
}
}
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId;
if (!syncEnvIsStart()) {
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId);
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
if (pSyncNode == NULL) {
vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
syncHeartbeatDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
syncHeartbeatReplyDestroy(pSyncMsg);
} else {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
code = -1;
}
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
code);
syncNodeRelease(pSyncNode);
if (code != 0 && terrno == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return code;
}
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId;
......
......@@ -287,6 +287,12 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
......
......@@ -2282,7 +2282,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
//ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
// ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
syncNodeBecomeLeader(pSyncNode, "candidate to leader");
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
......@@ -2676,7 +2676,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) {
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
return -1;
}
syncNodeReplicate(ths, false);
......@@ -2726,6 +2726,55 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
return ret;
}
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
syncLogRecvHeartbeat(ths, pMsg, "");
SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
pMsgReply->destId = pMsg->srcId;
pMsgReply->srcId = ths->myRaftId;
pMsgReply->term = ths->pRaftStore->currentTerm;
pMsgReply->privateTerm = 8864; // magic number
SRpcMsg rpcMsg;
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeBecomeFollower(ths, "become follower by hb");
}
if (pMsg->term == ths->pRaftStore->currentTerm) {
// sInfo("vgId:%d, heartbeat reset timer", ths->vgId);
syncNodeResetElectTimer(ths);
#if 0
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollowerCommit(ths, pMsg->commitIndex);
}
#endif
}
/*
// htonl
SMsgHead* pHead = rpcMsg.pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
*/
// reply
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
return 0;
}
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
syncLogRecvHeartbeatReply(ths, pMsg, "");
// update last reply time, make decision whether the other node is alive or not
syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);
return 0;
}
// TLA+ Spec
// ClientRequest(i, v) ==
// /\ state[i] = Leader
......@@ -2754,7 +2803,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) {
// del resp mgr, call FpCommitCb
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
return -1;
}
......@@ -3072,9 +3121,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
} else {
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
if (code != 0) {
sError("vgId:%d, failed to get log entry since %s. index:%lld", ths->vgId, tstrerror(terrno), i);
return -1;
}
sError("vgId:%d, failed to get log entry since %s. index:%lld", ths->vgId, tstrerror(terrno), i);
return -1;
}
ASSERT(pEntry != NULL);
}
......@@ -3335,3 +3384,45 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port,
pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port,
pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "send sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册