提交 a7807d3e 编写于 作者: B Benguang Zhao

fix: synchronize syncPropose for executing blocking msgs in vnodeProposeMsg

上级 7eca2127
......@@ -66,6 +66,10 @@ extern int32_t tMsgDict[];
typedef uint16_t tmsg_t;
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL);
}
/* ------------------------ OTHER DEFINITIONS ------------------------ */
// IE type
#define TSDB_IE_TYPE_SEC 1
......
......@@ -381,6 +381,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527)
#define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528)
#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
#define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
......@@ -181,14 +181,15 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
if (version <= pVnode->state.applied) {
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
pVnode->state.applied);
terrno = TSDB_CODE_VND_DUP_REQUEST;
pRsp->info.handle = NULL;
return -1;
}
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
version);
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
pVnode->state.applied = version;
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
......
......@@ -18,14 +18,15 @@
#define BATCH_ENABLE 0
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL);
}
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
tsem_wait(&pVnode->syncSem);
}
static inline void vnodeWaitBlockMsgOld(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) {
const STraceId *trace = &pMsg->info.traceId;
taosThreadMutexLock(&pVnode->lock);
......@@ -115,21 +116,31 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code)
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
if (*arrSize <= 0) return;
SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
taosThreadMutexLock(&pVnode->lock);
int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
bool wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
if (wait) {
ASSERT(!pVnode->blocked);
pVnode->blocked = true;
}
taosThreadMutexUnlock(&pVnode->lock);
if (code > 0) {
for (int32_t i = 0; i < *arrSize; ++i) {
vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
}
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]);
} else {
} else if (code < 0) {
if (terrno != 0) code = terrno;
for (int32_t i = 0; i < *arrSize; ++i) {
vnodeHandleProposeError(pVnode, pMsgArr[i], code);
}
}
if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
pLastMsg = NULL;
for (int32_t i = 0; i < *arrSize; ++i) {
SRpcMsg *pMsg = pMsgArr[i];
const STraceId *trace = &pMsg->info.traceId;
......@@ -206,16 +217,23 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
#else
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
taosThreadMutexLock(&pVnode->lock);
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak);
bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
if (wait) {
ASSERT(!pVnode->blocked);
pVnode->blocked = true;
}
taosThreadMutexUnlock(&pVnode->lock);
if (code > 0) {
vnodeHandleWriteMsg(pVnode, pMsg);
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsg);
} else {
} else if (code < 0) {
if (terrno != 0) code = terrno;
vnodeHandleProposeError(pVnode, pMsg, code);
}
if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
return code;
}
......@@ -274,6 +292,11 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, blocking msg obtained from apply-queue. index:%" PRId64 ", term: %" PRId64 ", type: %s", vgId,
pMsg->info.conn.applyIndex, pMsg->info.conn.applyTerm, TMSG_INFO(pMsg->msgType));
}
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) {
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
......
......@@ -2638,7 +2638,11 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
(*pRetIndex) = index;
}
return syncNodeAppend(ths, pEntry);
int32_t code = syncNodeAppend(ths, pEntry);
if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
ASSERT(false && "failed to append blocking msg");
}
return code;
}
return -1;
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "syncPipeline.h"
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftEntry.h"
......@@ -442,6 +443,11 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
return 0;
}
if (pNode->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
sTrace("vgId:%d, blocking msg ready to execute. index:%" PRId64 ", term: %" PRId64 ", type: %s", pNode->vgId,
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
}
SRpcMsg rpcMsg = {0};
syncEntry2OriginalRpc(pEntry, &rpcMsg);
......@@ -514,9 +520,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
}
continue;
}
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
sError("vgId:%d, failed to commit sync log entry. index:%" PRId64 ", term:%" PRId64
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role: %d, current term: %" PRId64,
vgId, pEntry->index, pEntry->term, role, term);
goto _out;
......
......@@ -326,6 +326,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exis
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_DUP_REQUEST, "Duplicate write request")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册