提交 2b256542 编写于 作者: S Shengliang Guan

refact: add vnode timer

上级 b89877b4
......@@ -230,7 +230,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
......@@ -240,6 +240,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid);
void syncSendTimeoutRsp(int64_t rid, int64_t seq);
SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
......@@ -497,6 +497,9 @@ enum {
// sort page size by default
#define DEFAULT_PAGESIZE 4096
#define VNODE_TIMEOUT_SEC 60
#define MNODE_TIMEOUT_SEC 10
#ifdef __cplusplus
}
#endif
......
......@@ -334,7 +334,18 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
taosMemoryFree(pMgmt);
}
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {}
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
vnodeSyncCheckTimeout(pVnode->pImpl);
pIter = taosHashIterate(pMgmt->hash, pIter);
}
}
static void *vmThreadFp(void *param) {
SVnodeMgmt *pMgmt = param;
......@@ -348,7 +359,7 @@ static void *vmThreadFp(void *param) {
if (lastTime % 10 != 0) continue;
int64_t sec = lastTime / 10;
if (sec % (tsStatusInterval * 5) == 0) {
if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
vmCheckSyncTimeout(pMgmt);
}
}
......
......@@ -54,6 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode* pVnode);
void vnodeClose(SVnode *pVnode);
int32_t vnodeStart(SVnode *pVnode);
......
......@@ -344,6 +344,8 @@ struct SVnode {
bool blocked;
bool restored;
tsem_t syncSem;
int32_t blockSec;
int64_t blockSeq;
SQHandle* pQuery;
};
......
......@@ -22,7 +22,8 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
tsem_wait(&pVnode->syncSem);
}
......@@ -202,12 +203,16 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
#else
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
int64_t seq = 0;
taosThreadMutexLock(&pVnode->lock);
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak);
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
if (wait) {
ASSERT(!pVnode->blocked);
pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
}
taosThreadMutexUnlock(&pVnode->lock);
......@@ -606,6 +611,25 @@ void vnodeSyncClose(SVnode *pVnode) {
syncStop(pVnode->sync);
}
void vnodeSyncCheckTimeout(SVnode *pVnode) {
vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pVnode->blockSec;
if (delta > VNODE_TIMEOUT_SEC) {
syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq);
vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
pVnode->blocked = false;
pVnode->blockSec = 0;
pVnode->blockSeq = 0;
tsem_post(&pVnode->syncSem);
}
}
taosThreadMutexUnlock(&pVnode->lock);
}
bool vnodeIsRoleLeader(SVnode *pVnode) {
SSyncState state = syncGetState(pVnode->sync);
return state.state == TAOS_SYNC_STATE_LEADER;
......
......@@ -215,7 +215,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode);
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq);
int32_t syncNodeRestore(SSyncNode* pSyncNode);
void syncHbTimerDataFree(SSyncHbTimerData* pData);
......
......@@ -151,7 +151,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
}
syncNodeStartHeartbeatTimer(pSyncNode);
//syncNodeReplicate(pSyncNode);
// syncNodeReplicate(pSyncNode);
}
syncNodeRelease(pSyncNode);
......@@ -218,6 +218,18 @@ int32_t syncLeaderTransfer(int64_t rid) {
return ret;
}
void syncSendTimeoutRsp(int64_t rid, int64_t seq) {
SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return;
SRpcMsg rpcMsg = {0};
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
syncNodeRelease(pNode);
rpcSendResponse(&rpcMsg);
}
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
......@@ -538,7 +550,7 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
pMsg->newLeaderId.vgId = pSyncNode->vgId;
pMsg->newNodeInfo = newLeader;
int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false);
int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
rpcFreeCont(rpcMsg.pCont);
return ret;
}
......@@ -670,19 +682,19 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease(pSyncNode);
}
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
sError("sync propose error");
return -1;
}
int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
syncNodeRelease(pSyncNode);
return ret;
}
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
......@@ -739,6 +751,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
}
if (seq != NULL) *seq = seqNum;
return code;
}
}
......
......@@ -337,7 +337,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
int32_t ret = syncPropose(rid, pRpcMsg, false, NULL);
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend);
} else {
......
......@@ -249,7 +249,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
int32_t ret = syncPropose(rid, pRpcMsg, false, NULL);
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend);
} else {
......
......@@ -189,7 +189,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
int32_t ret = syncPropose(rid, pRpcMsg, false, NULL);
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend);
} else {
......
......@@ -396,7 +396,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
int32_t ret = syncPropose(rid, pRpcMsg, false, NULL);
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册