提交 24a3c817 编写于 作者: M Minghao Li

refactor(sync): add batch propose

上级 376bf46a
......@@ -26,6 +26,7 @@ extern "C" {
extern bool gRaftDetailLog;
#define SYNC_MAX_BATCH_SIZE 100
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
......@@ -179,6 +180,7 @@ typedef struct SSyncInfo {
bool isStandBy;
bool snapshotEnable;
SyncGroupId vgId;
int32_t batchSize;
SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN];
SWal* pWal;
......
......@@ -245,6 +245,7 @@ typedef struct SyncClientRequestBatch {
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId);
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
// ---------------------------------------------
typedef struct SyncClientRequestReply {
......@@ -600,6 +601,7 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg);
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
......
......@@ -422,6 +422,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
......
......@@ -170,6 +170,7 @@ void syncNodeStart(SSyncNode* pSyncNode);
void syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
// option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
......
......@@ -50,7 +50,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
// process message ----
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
// life cycle
static void syncFreeNode(void* param);
......@@ -627,7 +626,93 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return ret;
}
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { return 0; }
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) {
if (arrSize < 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
int32_t ret = 0;
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) {
for (int32_t i = 0; i < arrSize; ++i) {
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE) {
return false;
}
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
return false;
}
}
return true;
}
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) {
if (!syncNodeBatchOK(pMsgArr, arrSize)) {
syncNodeErrorLog(pSyncNode, "sync propose batch error");
terrno = TSDB_CODE_SYN_BATCH_ERROR;
return -1;
}
if (arrSize > SYNC_MAX_BATCH_SIZE) {
syncNodeErrorLog(pSyncNode, "sync propose match batch error");
terrno = TSDB_CODE_SYN_BATCH_ERROR;
return -1;
}
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeErrorLog(pSyncNode, "sync propose not leader");
terrno = TSDB_CODE_SYN_NOT_LEADER;
return -1;
}
if (pSyncNode->changing) {
syncNodeErrorLog(pSyncNode, "sync propose not ready");
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
return -1;
}
SRaftMeta raftArr[SYNC_MAX_BATCH_SIZE];
for (int i = 0; i < arrSize; ++i) {
SRespStub stub;
stub.createTime = taosGetTimestampMs();
stub.rpcMsg = pMsgArr[i];
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
raftArr[i].isWeak = pIsWeakArr[i];
raftArr[i].seqNum = seqNum;
}
SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgArr, raftArr, arrSize, pSyncNode->vgId);
ASSERT(pSyncMsg != NULL);
SRpcMsg rpcMsg;
syncClientRequestBatch2RpcMsg(pSyncMsg, &rpcMsg);
taosMemoryFree(pSyncMsg); // only free msg body, do not free rpc msg content
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
// enqueue msg ok
} else {
sError("enqueue msg error, FpEqMsg is NULL");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
return 0;
}
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
......@@ -2364,6 +2449,49 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
return ret;
}
int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg) {
int32_t code = 0;
if (ths->state != TAOS_SYNC_STATE_LEADER) {
// call FpCommitCb, delete resp mgr
return -1;
}
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->pRaftStore->currentTerm;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * pMsg->dataCount;
int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount;
SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data);
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
for (int32_t i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pEntry = syncEntryBuild(msgArr[i].contLen);
ASSERT(pEntry != NULL);
pEntry->originalRpcType = msgArr[i].msgType;
pEntry->seqNum = raftMetaArr[i].seqNum;
pEntry->isWeak = raftMetaArr[i].isWeak;
pEntry->term = term;
pEntry->index = index;
memcpy(pEntry->data, msgArr[i].pCont, msgArr[i].contLen);
ASSERT(msgArr[i].contLen == pEntry->dataLen);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) {
// del resp mgr, call FpCommitCb
ASSERT(0);
return -1;
}
}
// fsync once
SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal;
walFsync(pWal, true);
return 0;
}
static void syncFreeNode(void* param) {
SSyncNode* pNode = param;
// inner object already free
......
......@@ -968,8 +968,9 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet
ASSERT(arrSize > 0);
int32_t dataLen = 0;
int32_t raftMetaArrayLen = sizeof(SRpcMsg) * arrSize;
int32_t rpcArrayLen = sizeof(SRaftMeta) * arrSize;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize;
dataLen += (raftMetaArrayLen + rpcArrayLen);
uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen;
SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes);
......@@ -995,6 +996,8 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet
return pMsg;
}
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {}
// ---- message process SyncRequestVote----
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncRequestVote);
......
......@@ -429,6 +429,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RECONFIG_NOT_READY, "Sync not ready for reconfig")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
// wal
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册