提交 64f4325b 编写于 作者: M Minghao Li

refactor(sync): modify propose batch interface

上级 a75745de
...@@ -210,7 +210,7 @@ SyncGroupId syncGetVgId(int64_t rid); ...@@ -210,7 +210,7 @@ SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
......
...@@ -238,7 +238,7 @@ typedef struct SyncClientRequestBatch { ...@@ -238,7 +238,7 @@ typedef struct SyncClientRequestBatch {
char data[]; // block2, block3 char data[]; // block2, block3
} SyncClientRequestBatch; } SyncClientRequestBatch;
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId); int32_t vgId);
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg); void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg); void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
......
...@@ -170,7 +170,7 @@ void syncNodeStart(SSyncNode* pSyncNode); ...@@ -170,7 +170,7 @@ void syncNodeStart(SSyncNode* pSyncNode);
void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
// option // option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
......
...@@ -677,7 +677,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { ...@@ -677,7 +677,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return ret; return ret;
} }
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) {
if (arrSize < 0) { if (arrSize < 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
...@@ -690,18 +690,18 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_ ...@@ -690,18 +690,18 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize); int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgPArr, pIsWeakArr, arrSize);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret; return ret;
} }
static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) { static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) {
for (int32_t i = 0; i < arrSize; ++i) { for (int32_t i = 0; i < arrSize; ++i) {
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE) { if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE) {
return false; return false;
} }
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
return false; return false;
} }
} }
...@@ -709,8 +709,8 @@ static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) { ...@@ -709,8 +709,8 @@ static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) {
return true; return true;
} }
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) {
if (!syncNodeBatchOK(pMsgArr, arrSize)) { if (!syncNodeBatchOK(pMsgPArr, arrSize)) {
syncNodeErrorLog(pSyncNode, "sync propose batch error"); syncNodeErrorLog(pSyncNode, "sync propose batch error");
terrno = TSDB_CODE_SYN_BATCH_ERROR; terrno = TSDB_CODE_SYN_BATCH_ERROR;
return -1; return -1;
...@@ -738,14 +738,14 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe ...@@ -738,14 +738,14 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
for (int i = 0; i < arrSize; ++i) { for (int i = 0; i < arrSize; ++i) {
SRespStub stub; SRespStub stub;
stub.createTime = taosGetTimestampMs(); stub.createTime = taosGetTimestampMs();
stub.rpcMsg = pMsgArr[i]; stub.rpcMsg = *(pMsgPArr[i]);
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
raftArr[i].isWeak = pIsWeakArr[i]; raftArr[i].isWeak = pIsWeakArr[i];
raftArr[i].seqNum = seqNum; raftArr[i].seqNum = seqNum;
} }
SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgArr, raftArr, arrSize, pSyncNode->vgId); SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgPArr, raftArr, arrSize, pSyncNode->vgId);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -759,7 +759,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe ...@@ -759,7 +759,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg); SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg);
ASSERT(arrSize == pSyncMsg->dataCount); ASSERT(arrSize == pSyncMsg->dataCount);
for (int i = 0; i < arrSize; ++i) { for (int i = 0; i < arrSize; ++i) {
pMsgArr[i].info.conn.applyIndex = msgArr[i].info.conn.applyIndex; pMsgPArr[i]->info.conn.applyIndex = msgArr[i].info.conn.applyIndex;
syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum); syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum);
} }
...@@ -860,7 +860,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -860,7 +860,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else { } else {
ret = -1; ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("vgId:%d, sync propose not leader, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state)); sError("vgId:%d, sync propose not leader, %s, msgtype:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), TMSG_INFO(pMsg->msgType), pMsg->msgType);
goto _END; goto _END;
} }
......
...@@ -963,9 +963,9 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) { ...@@ -963,9 +963,9 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
// block2: SRaftMeta array // block2: SRaftMeta array
// block3: rpc msg array (with pCont) // block3: rpc msg array (with pCont)
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId) { int32_t vgId) {
ASSERT(rpcMsgArr != NULL); ASSERT(rpcMsgPArr != NULL);
ASSERT(arrSize > 0); ASSERT(arrSize > 0);
int32_t dataLen = 0; int32_t dataLen = 0;
...@@ -991,7 +991,7 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet ...@@ -991,7 +991,7 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet
raftMetaArr[i].seqNum = raftArr[i].seqNum; raftMetaArr[i].seqNum = raftArr[i].seqNum;
// init msgArr // init msgArr
msgArr[i] = rpcMsgArr[i]; msgArr[i] = *(rpcMsgPArr[i]);
} }
return pMsg; return pMsg;
......
...@@ -28,12 +28,12 @@ SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) { ...@@ -28,12 +28,12 @@ SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) {
} }
SyncClientRequestBatch *createMsg() { SyncClientRequestBatch *createMsg() {
SRpcMsg rpcMsgArr[5]; SRpcMsg *rpcMsgPArr[5];
memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); memset(rpcMsgPArr, 0, sizeof(rpcMsgPArr));
for (int32_t i = 0; i < 5; ++i) { for (int32_t i = 0; i < 5; ++i) {
SRpcMsg *pRpcMsg = createRpcMsg(i, 20); SRpcMsg *pRpcMsg = createRpcMsg(i, 20);
rpcMsgArr[i] = *pRpcMsg; rpcMsgPArr[i] = pRpcMsg;
taosMemoryFree(pRpcMsg); //taosMemoryFree(pRpcMsg);
} }
SRaftMeta raftArr[5]; SRaftMeta raftArr[5];
...@@ -43,7 +43,7 @@ SyncClientRequestBatch *createMsg() { ...@@ -43,7 +43,7 @@ SyncClientRequestBatch *createMsg() {
raftArr[i].isWeak = i % 2; raftArr[i].isWeak = i % 2;
} }
SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgArr, raftArr, 5, 1234); SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgPArr, raftArr, 5, 1234);
return pMsg; return pMsg;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册