diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index cb1b19ca5e6bd2d8c5a405f21321529a9f4a79dd..22a3ff539aee1db3bc62a35800fdd568c83566ac 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -656,64 +656,57 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { } int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { - int32_t ret = 0; - sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType)); + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + terrno = TSDB_CODE_SYN_NOT_LEADER; + sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType)); + return -1; + } - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - // not restored, vnode enable - if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) { + // not restored, vnode enable + if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) { + terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; + sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64, + TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); + return -1; + } + + int32_t ret = 0; + SyncClientRequest* pSyncMsg; + + // optimized one replica + if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) { + pSyncMsg = syncClientRequestBuild(pMsg, 0, isWeak, pSyncNode->vgId); + + SyncIndex retIndex; + int32_t code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex); + if (code == 0) { + pMsg->info.conn.applyIndex = retIndex; + pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm; + ret = 1; + sTrace("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); + } else { ret = -1; - terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; - sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64, - pSyncNode->vgId, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); - goto _END; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, + TMSG_INFO(pMsg->msgType)); } - + } else { SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg}; uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); - SyncClientRequest* pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId); - SRpcMsg rpcMsg = {0}; + pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); - // optimized one replica - if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) { - SyncIndex retIndex; - int32_t code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex); - if (code == 0) { - pMsg->info.conn.applyIndex = retIndex; - pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm; - rpcFreeCont(rpcMsg.pCont); - syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); - ret = 1; - sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, - TMSG_INFO(pMsg->msgType)); - } else { - ret = -1; - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, - TMSG_INFO(pMsg->msgType)); - } - } else { - ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg); - if (ret != 0) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId); - } + sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType)); + ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg); + if (ret != 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + sError("vgId:%d, failed to enqueue msg since %s", pSyncNode->vgId, terrstr()); } - - syncClientRequestDestroy(pSyncMsg); - goto _END; - - } else { - ret = -1; - terrno = TSDB_CODE_SYN_NOT_LEADER; - sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncStr(pSyncNode->state), - TMSG_INFO(pMsg->msgType)); - goto _END; } -_END: + syncClientRequestDestroy(pSyncMsg); return ret; } @@ -2522,7 +2515,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); SyncTerm term = ths->pRaftStore->currentTerm; - SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); + SSyncRaftEntry* pEntry = syncEntryBuild2(pMsg, term, index); ASSERT(pEntry != NULL); LRUHandle* h = NULL; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index e052a7a4fae15cd601c7fbacc13cce9d2dec94b0..64536935762819d0d67296e5e09c0cd27bd58f05 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -836,8 +836,6 @@ SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) { SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes); pMsg->bytes = bytes; pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; - pMsg->seqNum = 0; - pMsg->isWeak = false; pMsg->dataLen = dataLen; return pMsg; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 31c649b2f33ef5f5fc791d690b337521d4eba210..4f6f5bdc33ab4a21d2aed88da29b94a84bcf453b 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -178,29 +178,11 @@ void syncUtilMsgNtoH(void* msg) { pHead->vgId = ntohl(pHead->vgId); } -bool syncUtilUserPreCommit(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) { - return true; - } - - return false; -} - -bool syncUtilUserCommit(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) { - return true; - } - - return false; -} +bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } -bool syncUtilUserRollback(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) { - return true; - } +bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } - return false; -} +bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex);