提交 60bab9ae 编写于 作者: S Shengliang Guan

refact: post sem in vnode while sync timeout

上级 9af36669
......@@ -240,7 +240,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid);
void syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
......@@ -322,31 +322,32 @@ struct STsdbKeepCfg {
};
struct SVnode {
char* path;
SVnodeCfg config;
SVState state;
SVStatis statis;
STfs* pTfs;
SMsgCb msgCb;
TdThreadMutex mutex;
TdThreadCond poolNotEmpty;
SVBufPool* pPool;
SVBufPool* inUse;
SMeta* pMeta;
SSma* pSma;
STsdb* pTsdb;
SWal* pWal;
STQ* pTq;
SSink* pSink;
tsem_t canCommit;
int64_t sync;
TdThreadMutex lock;
bool blocked;
bool restored;
tsem_t syncSem;
int32_t blockSec;
int64_t blockSeq;
SQHandle* pQuery;
char* path;
SVnodeCfg config;
SVState state;
SVStatis statis;
STfs* pTfs;
SMsgCb msgCb;
TdThreadMutex mutex;
TdThreadCond poolNotEmpty;
SVBufPool* pPool;
SVBufPool* inUse;
SMeta* pMeta;
SSma* pSma;
STsdb* pTsdb;
SWal* pWal;
STQ* pTq;
SSink* pSink;
tsem_t canCommit;
int64_t sync;
TdThreadMutex lock;
bool blocked;
bool restored;
tsem_t syncSem;
int32_t blockSec;
int64_t blockSeq;
SRpcHandleInfo blockInfo;
SQHandle* pQuery;
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
......
......@@ -32,8 +32,11 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
pVnode->blocked = false;
pVnode->blockSec = 0;
pVnode->blockSeq = 0;
tsem_post(&pVnode->syncSem);
}
taosThreadMutexUnlock(&pVnode->lock);
......@@ -213,6 +216,7 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
pVnode->blockInfo = pMsg->info;
}
taosThreadMutexUnlock(&pVnode->lock);
......@@ -279,12 +283,15 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, blocking msg obtained from apply-queue. index:%" PRId64 ", term: %" PRId64 ", type: %s", vgId,
pMsg->info.conn.applyIndex, pMsg->info.conn.applyTerm, TMSG_INFO(pMsg->msgType));
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
", blocking msg obtained sec:%d seq:%" PRId64,
vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
pVnode->blockSeq);
} else {
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
}
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
......@@ -618,9 +625,14 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) {
int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pVnode->blockSec;
if (delta > VNODE_TIMEOUT_SEC) {
syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq);
vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
vInfo("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
rpcMsg.info.handle, rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg);
}
pVnode->blocked = false;
pVnode->blockSec = 0;
pVnode->blockSeq = 0;
......
......@@ -218,9 +218,9 @@ int32_t syncLeaderTransfer(int64_t rid) {
return ret;
}
void syncSendTimeoutRsp(int64_t rid, int64_t seq) {
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return;
if (pNode == NULL) return -1;
SRpcMsg rpcMsg = {0};
int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
......@@ -228,9 +228,13 @@ void syncSendTimeoutRsp(int64_t rid, int64_t seq) {
syncNodeRelease(pNode);
if (ret == 1) {
sInfo("send response since sync timeout, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle,
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle,
rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg);
return 0;
} else {
sInfo("no rpcinfo to send timeout response, seq:%" PRId64, seq);
return -1;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册