提交 dc42544e 编写于 作者: M Minghao Li

refactor(sync): add leader, follower call back2

上级 b4c86857
......@@ -163,6 +163,7 @@ typedef struct SSyncNode {
bool changing;
int64_t startTime;
int64_t leaderTime;
int64_t lastReplicateTime;
} SSyncNode;
......
......@@ -32,9 +32,9 @@ typedef struct SRespStub {
} SRespStub;
typedef struct SSyncRespMgr {
SHashObj * pRespHash;
SHashObj *pRespHash;
int64_t ttl;
void * data;
void *data;
TdThreadMutex mutex;
uint64_t seqNum;
} SSyncRespMgr;
......@@ -46,7 +46,8 @@ int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index);
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
void syncRespClean(SSyncRespMgr *pObj);
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl);
void syncRespCleanRsp(SSyncRespMgr *pObj);
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp);
#ifdef __cplusplus
}
......
......@@ -1100,6 +1100,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
int64_t timeNow = taosGetTimestampMs();
pSyncNode->startTime = timeNow;
pSyncNode->leaderTime = timeNow;
pSyncNode->lastReplicateTime = timeNow;
syncNodeEventLog(pSyncNode, "sync open");
......@@ -2015,6 +2016,8 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
}
}
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
......@@ -2028,6 +2031,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// reset elect timer
syncNodeResetElectTimer(pSyncNode);
// send rsp to client
syncNodeLeaderChangeRsp(pSyncNode);
// call back
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
......@@ -2068,6 +2074,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->leaderTime = taosGetTimestampMs();
// reset restoreFinish
pSyncNode->restoreFinish = false;
......@@ -2954,8 +2962,11 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
}
ths->restoreFinish = true;
int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "restore finish, index:%" PRId64, pEntry->index);
snprintf(eventLog, sizeof(eventLog), "restore finish, index:%ld, elapsed:%ld ms, ", pEntry->index,
restoreDelay);
syncNodeEventLog(ths, eventLog);
}
}
......
......@@ -108,13 +108,19 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
return 0; // get none object
}
void syncRespCleanRsp(SSyncRespMgr *pObj) {
taosThreadMutexLock(&(pObj->mutex));
syncRespCleanByTTL(pObj, -1, true);
taosThreadMutexUnlock(&(pObj->mutex));
}
void syncRespClean(SSyncRespMgr *pObj) {
taosThreadMutexLock(&(pObj->mutex));
syncRespCleanByTTL(pObj, pObj->ttl);
syncRespCleanByTTL(pObj, pObj->ttl, false);
taosThreadMutexUnlock(&(pObj->mutex));
}
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
int cnt = 0;
int sum = 0;
......@@ -126,12 +132,12 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
while (pStub) {
size_t len;
void * key = taosHashGetKey(pStub, &len);
void *key = taosHashGetKey(pStub, &len);
uint64_t *pSeqNum = (uint64_t *)key;
sum++;
int64_t nowMS = taosGetTimestampMs();
if (nowMS - pStub->createTime > ttl) {
if (nowMS - pStub->createTime > ttl || -1 == ttl) {
taosArrayPush(delIndexArray, pSeqNum);
cnt++;
......@@ -148,7 +154,14 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
pStub->rpcMsg.pCont = NULL;
pStub->rpcMsg.contLen = 0;
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
// TODO: and make rpcMsg body, call commit cb
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
if (pStub->rpcMsg.info.handle != NULL) {
tmsgSendRsp(&(pStub->rpcMsg));
}
}
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册