提交 848fd584 编写于 作者: S Shengliang Guan

enh: adjust sync propose

上级 4ddd25a2
......@@ -656,64 +656,57 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
}
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType));
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
return -1;
}
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
// not restored, vnode enable
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
// not restored, vnode enable
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
return -1;
}
int32_t ret = 0;
SyncClientRequest* pSyncMsg;
// optimized one replica
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
pSyncMsg = syncClientRequestBuild(pMsg, 0, isWeak, pSyncNode->vgId);
SyncIndex retIndex;
int32_t code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex);
if (code == 0) {
pMsg->info.conn.applyIndex = retIndex;
pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
ret = 1;
sTrace("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType));
} else {
ret = -1;
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
pSyncNode->vgId, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
goto _END;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType));
}
} else {
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
SyncClientRequest* pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId);
SRpcMsg rpcMsg = {0};
pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId);
SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
// optimized one replica
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
SyncIndex retIndex;
int32_t code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex);
if (code == 0) {
pMsg->info.conn.applyIndex = retIndex;
pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
rpcFreeCont(rpcMsg.pCont);
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
ret = 1;
sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType));
} else {
ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType));
}
} else {
ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
if (ret != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId);
}
sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType));
ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
if (ret != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, failed to enqueue msg since %s", pSyncNode->vgId, terrstr());
}
syncClientRequestDestroy(pSyncMsg);
goto _END;
} else {
ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncStr(pSyncNode->state),
TMSG_INFO(pMsg->msgType));
goto _END;
}
_END:
syncClientRequestDestroy(pSyncMsg);
return ret;
}
......@@ -2522,7 +2515,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->pRaftStore->currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
SSyncRaftEntry* pEntry = syncEntryBuild2(pMsg, term, index);
ASSERT(pEntry != NULL);
LRUHandle* h = NULL;
......
......@@ -836,8 +836,6 @@ SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) {
SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pMsg->seqNum = 0;
pMsg->isWeak = false;
pMsg->dataLen = dataLen;
return pMsg;
}
......
......@@ -178,29 +178,11 @@ void syncUtilMsgNtoH(void* msg) {
pHead->vgId = ntohl(pHead->vgId);
}
bool syncUtilUserPreCommit(tmsg_t msgType) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) {
return true;
}
return false;
}
bool syncUtilUserCommit(tmsg_t msgType) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) {
return true;
}
return false;
}
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
bool syncUtilUserRollback(tmsg_t msgType) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) {
return true;
}
bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
return false;
}
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册