提交 473e134f 编写于 作者: M Minghao Li

refactor(sync): add resp ttl clean

上级 96f9274f
...@@ -26,6 +26,8 @@ extern "C" { ...@@ -26,6 +26,8 @@ extern "C" {
extern bool gRaftDetailLog; extern bool gRaftDetailLog;
#define SYNC_RESP_TTL_MS 5000
#define SYNC_MAX_BATCH_SIZE 500 #define SYNC_MAX_BATCH_SIZE 500
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1 #define SYNC_INDEX_INVALID -1
......
...@@ -427,13 +427,22 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c ...@@ -427,13 +427,22 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; if (cbMeta.code == 0) {
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
rpcMsg.info.conn.applyIndex = cbMeta.index; syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyTerm = cbMeta.term; rpcMsg.info.conn.applyIndex = cbMeta.index;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), pMsg->msgType,
TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
} }
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
......
...@@ -1055,19 +1055,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -1055,19 +1055,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
} }
// tools // tools
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, 0); pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
ASSERT(pSyncNode->pSyncRespMgr != NULL); ASSERT(pSyncNode->pSyncRespMgr != NULL);
// restore state // restore state
pSyncNode->restoreFinish = false; pSyncNode->restoreFinish = false;
// pSyncNode->pSnapshot = NULL;
// if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
// pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
// pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pSyncNode->pSnapshot);
// }
// tsem_init(&(pSyncNode->restoreSem), 0, 0);
// snapshot senders // snapshot senders
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i); SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
......
...@@ -122,54 +122,43 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) { ...@@ -122,54 +122,43 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
int cnt = 0; int cnt = 0;
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
SArray *delIndexArray = taosArrayInit(0, sizeof(SyncIndex)); SArray *delIndexArray = taosArrayInit(0, sizeof(uint64_t));
ASSERT(delIndexArray != NULL); ASSERT(delIndexArray != NULL);
while (pStub) { while (pStub) {
size_t len; size_t len;
void * key = taosHashGetKey(pStub, &len); void *key = taosHashGetKey(pStub, &len);
SyncIndex *pIndex = (SyncIndex *)key; uint64_t *pSeqNum = (uint64_t *)key;
int64_t nowMS = taosGetTimestampMs(); int64_t nowMS = taosGetTimestampMs();
if (nowMS - pStub->createTime > ttl) { if (nowMS - pStub->createTime > ttl) {
taosArrayPush(delIndexArray, pIndex); taosArrayPush(delIndexArray, pSeqNum);
cnt++; cnt++;
SSyncRaftEntry *pEntry = NULL; SFsmCbMeta cbMeta = {0};
int32_t code = 0; cbMeta.index = SYNC_INDEX_INVALID;
if (pSyncNode->pLogStore != NULL) { cbMeta.lastConfigIndex = SYNC_INDEX_INVALID;
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, *pIndex, &pEntry); cbMeta.isWeak = false;
if (code == 0 && pEntry != NULL) { cbMeta.code = TSDB_CODE_SYN_TIMEOUT;
SFsmCbMeta cbMeta = {0}; cbMeta.state = pSyncNode->state;
cbMeta.index = pEntry->index; cbMeta.seqNum = *pSeqNum;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, cbMeta.index); cbMeta.term = SYNC_TERM_INVALID;
cbMeta.isWeak = pEntry->isWeak; cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.code = TSDB_CODE_SYN_TIMEOUT; cbMeta.flag = 0;
cbMeta.state = pSyncNode->state;
cbMeta.seqNum = pEntry->seqNum; pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
cbMeta.term = pEntry->term;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.flag = 0;
SRpcMsg rpcMsg = pStub->rpcMsg;
rpcMsg.pCont = rpcMallocCont(pEntry->dataLen);
memcpy(rpcMsg.pCont, pEntry->data, pEntry->dataLen);
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
syncEntryDestory(pEntry);
}
}
} }
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub); pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
} }
int32_t arraySize = taosArrayGetSize(delIndexArray); int32_t arraySize = taosArrayGetSize(delIndexArray);
sDebug("vgId:%d, resp clean by ttl, cnt:%d, array-size:%d", pSyncNode->vgId, cnt, arraySize); sDebug("vgId:%d, resp mgr clean by ttl, cnt:%d, array-size:%d", pSyncNode->vgId, cnt, arraySize);
for (int32_t i = 0; i < arraySize; ++i) { for (int32_t i = 0; i < arraySize; ++i) {
SyncIndex *pIndex = taosArrayGet(delIndexArray, i); uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
taosHashRemove(pObj->pRespHash, pIndex, sizeof(SyncIndex)); taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
sDebug("vgId:%d, resp mgr clean by ttl, seq:%d", pSyncNode->vgId, *pSeqNum);
} }
taosArrayDestroy(delIndexArray); taosArrayDestroy(delIndexArray);
} }
...@@ -16,9 +16,11 @@ ...@@ -16,9 +16,11 @@
#include "syncTimeout.h" #include "syncTimeout.h"
#include "syncElection.h" #include "syncElection.h"
#include "syncReplication.h" #include "syncReplication.h"
#include "syncRespMgr.h"
int32_t syncNodeTimerRoutine(SSyncNode* ths) { int32_t syncNodeTimerRoutine(SSyncNode* ths) {
syncNodeEventLog(ths, "timer routines ... "); syncNodeEventLog(ths, "timer routines ... ");
syncRespClean(ths->pSyncRespMgr);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册