提交 4ddd25a2 编写于 作者: S Shengliang Guan

enh: adjust sync propose

上级 46e55ba9
...@@ -182,9 +182,8 @@ typedef struct SyncClientRequest { ...@@ -182,9 +182,8 @@ typedef struct SyncClientRequest {
char data[]; // origin RpcMsg.pCont char data[]; // origin RpcMsg.pCont
} SyncClientRequest; } SyncClientRequest;
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen);
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, SyncClientRequest* syncClientRequestBuild(const SRpcMsg* pMsg, uint64_t seqNum, bool isWeak, int32_t vgId); // step 1
int32_t vgId); // step 1
void syncClientRequestDestroy(SyncClientRequest* pMsg); void syncClientRequestDestroy(SyncClientRequest* pMsg);
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg); void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg);
......
...@@ -669,13 +669,11 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -669,13 +669,11 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
goto _END; goto _END;
} }
SRespStub stub; SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
stub.createTime = taosGetTimestampMs(); uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
stub.rpcMsg = *pMsg;
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId); SyncClientRequest* pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId);
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
// optimized one replica // optimized one replica
...@@ -696,12 +694,9 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -696,12 +694,9 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
} }
} else { } else {
if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
ret = 0; if (ret != 0) {
} else {
ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId); sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId);
} }
...@@ -2322,7 +2317,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { ...@@ -2322,7 +2317,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
uint32_t entryLen; uint32_t entryLen;
char* serialized = syncEntrySerialize(pEntry, &entryLen); char* serialized = syncEntrySerialize(pEntry, &entryLen);
SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen); SyncClientRequest* pSyncMsg = syncClientRequestAlloc(entryLen);
ASSERT(pSyncMsg->dataLen == entryLen); ASSERT(pSyncMsg->dataLen == entryLen);
memcpy(pSyncMsg->data, serialized, entryLen); memcpy(pSyncMsg->data, serialized, entryLen);
......
...@@ -831,10 +831,9 @@ void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) { ...@@ -831,10 +831,9 @@ void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) {
} }
// ---- message process SyncClientRequest---- // ---- message process SyncClientRequest----
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncClientRequest) + dataLen; uint32_t bytes = sizeof(SyncClientRequest) + dataLen;
SyncClientRequest* pMsg = taosMemoryMalloc(bytes); SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes; pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pMsg->seqNum = 0; pMsg->seqNum = 0;
...@@ -844,8 +843,8 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { ...@@ -844,8 +843,8 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
} }
// step 1. original SRpcMsg => SyncClientRequest, add seqNum, isWeak // step 1. original SRpcMsg => SyncClientRequest, add seqNum, isWeak
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) { SyncClientRequest* syncClientRequestBuild(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) {
SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); SyncClientRequest* pMsg = syncClientRequestAlloc(pOriginalRpcMsg->contLen);
pMsg->vgId = vgId; pMsg->vgId = vgId;
pMsg->originalRpcType = pOriginalRpcMsg->msgType; pMsg->originalRpcType = pOriginalRpcMsg->msgType;
pMsg->seqNum = seqNum; pMsg->seqNum = seqNum;
...@@ -891,7 +890,6 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len) ...@@ -891,7 +890,6 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len)
// step 2. SyncClientRequest => RpcMsg, to queue // step 2. SyncClientRequest => RpcMsg, to queue
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) { void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType; pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes; pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
......
...@@ -81,7 +81,7 @@ void test4() { ...@@ -81,7 +81,7 @@ void test4() {
void test5() { void test5() {
SyncApplyMsg *pMsg = createMsg(); SyncApplyMsg *pMsg = createMsg();
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncApplyMsg2RpcMsg(pMsg, &rpcMsg); syncApplyMsg2RpcMsg(pMsg, &rpcMsg);
SyncApplyMsg *pMsg2 = syncApplyMsgFromRpcMsg2(&rpcMsg); SyncApplyMsg *pMsg2 = syncApplyMsgFromRpcMsg2(&rpcMsg);
syncApplyMsgLog2((char *)"test5: syncClientRequest2RpcMsg -> syncApplyMsgFromRpcMsg2 ", pMsg2); syncApplyMsgLog2((char *)"test5: syncClientRequest2RpcMsg -> syncApplyMsgFromRpcMsg2 ", pMsg2);
......
...@@ -59,7 +59,7 @@ void test2() { ...@@ -59,7 +59,7 @@ void test2() {
uint32_t len = pMsg->bytes; uint32_t len = pMsg->bytes;
char * serialized = (char *)taosMemoryMalloc(len); char * serialized = (char *)taosMemoryMalloc(len);
syncClientRequestSerialize(pMsg, serialized, len); syncClientRequestSerialize(pMsg, serialized, len);
SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen); SyncClientRequest *pMsg2 = syncClientRequestAlloc(pMsg->dataLen);
syncClientRequestDeserialize(serialized, len, pMsg2); syncClientRequestDeserialize(serialized, len, pMsg2);
syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2); syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2);
......
...@@ -21,7 +21,7 @@ SyncClientRequest *createMsg() { ...@@ -21,7 +21,7 @@ SyncClientRequest *createMsg() {
rpcMsg.contLen = 20; rpcMsg.contLen = 20;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
strcpy((char *)rpcMsg.pCont, "hello rpc"); strcpy((char *)rpcMsg.pCont, "hello rpc");
SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true, 1000); SyncClientRequest *pMsg = syncClientRequestBuild(&rpcMsg, 123, true, 1000);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
return pMsg; return pMsg;
} }
...@@ -37,7 +37,7 @@ void test2() { ...@@ -37,7 +37,7 @@ void test2() {
uint32_t len = pMsg->bytes; uint32_t len = pMsg->bytes;
char *serialized = (char *)taosMemoryMalloc(len); char *serialized = (char *)taosMemoryMalloc(len);
syncClientRequestSerialize(pMsg, serialized, len); syncClientRequestSerialize(pMsg, serialized, len);
SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen); SyncClientRequest *pMsg2 = syncClientRequestAlloc(pMsg->dataLen);
syncClientRequestDeserialize(serialized, len, pMsg2); syncClientRequestDeserialize(serialized, len, pMsg2);
syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2); syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2);
...@@ -60,7 +60,7 @@ void test3() { ...@@ -60,7 +60,7 @@ void test3() {
void test4() { void test4() {
SyncClientRequest *pMsg = createMsg(); SyncClientRequest *pMsg = createMsg();
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pMsg, &rpcMsg); syncClientRequest2RpcMsg(pMsg, &rpcMsg);
SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(rpcMsg.contLen); SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(rpcMsg.contLen);
syncClientRequestFromRpcMsg(&rpcMsg, pMsg2); syncClientRequestFromRpcMsg(&rpcMsg, pMsg2);
...@@ -73,7 +73,7 @@ void test4() { ...@@ -73,7 +73,7 @@ void test4() {
void test5() { void test5() {
SyncClientRequest *pMsg = createMsg(); SyncClientRequest *pMsg = createMsg();
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pMsg, &rpcMsg); syncClientRequest2RpcMsg(pMsg, &rpcMsg);
SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg); SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg);
syncClientRequestLog2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2); syncClientRequestLog2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2);
......
...@@ -102,12 +102,12 @@ SRpcMsg *step0() { ...@@ -102,12 +102,12 @@ SRpcMsg *step0() {
} }
SyncClientRequest *step1(const SRpcMsg *pMsg) { SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000);
return pRetMsg; return pRetMsg;
} }
SRpcMsg *step2(const SyncClientRequest *pMsg) { SRpcMsg *step2(const SyncClientRequest *pMsg) {
SRpcMsg *pRetMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); SRpcMsg *pRetMsg = (SRpcMsg *)taosMemoryCalloc(sizeof(SRpcMsg), 1);
syncClientRequest2RpcMsg(pMsg, pRetMsg); syncClientRequest2RpcMsg(pMsg, pRetMsg);
return pRetMsg; return pRetMsg;
} }
......
...@@ -32,7 +32,7 @@ void test1() { ...@@ -32,7 +32,7 @@ void test1() {
} }
void test2() { void test2() {
SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); SyncClientRequest* pSyncMsg = syncClientRequestAlloc(10);
pSyncMsg->originalRpcType = 33; pSyncMsg->originalRpcType = 33;
pSyncMsg->seqNum = 11; pSyncMsg->seqNum = 11;
pSyncMsg->isWeak = 1; pSyncMsg->isWeak = 1;
...@@ -46,7 +46,7 @@ void test2() { ...@@ -46,7 +46,7 @@ void test2() {
} }
void test3() { void test3() {
SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); SyncClientRequest* pSyncMsg = syncClientRequestAlloc(10);
pSyncMsg->originalRpcType = 33; pSyncMsg->originalRpcType = 33;
pSyncMsg->seqNum = 11; pSyncMsg->seqNum = 11;
pSyncMsg->isWeak = 1; pSyncMsg->isWeak = 1;
......
...@@ -47,7 +47,7 @@ SyncClientRequest *createSyncClientRequest() { ...@@ -47,7 +47,7 @@ SyncClientRequest *createSyncClientRequest() {
rpcMsg.contLen = 20; rpcMsg.contLen = 20;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
strcpy((char *)rpcMsg.pCont, "hello rpc"); strcpy((char *)rpcMsg.pCont, "hello rpc");
SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true, 1000); SyncClientRequest *pMsg = syncClientRequestBuild(&rpcMsg, 123, true, 1000);
return pMsg; return pMsg;
} }
...@@ -156,7 +156,7 @@ void test7() { ...@@ -156,7 +156,7 @@ void test7() {
void test8() { void test8() {
SyncClientRequest *pMsg = createSyncClientRequest(); SyncClientRequest *pMsg = createSyncClientRequest();
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pMsg, &rpcMsg); syncClientRequest2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgLog2((char *)"test8", &rpcMsg); syncRpcMsgLog2((char *)"test8", &rpcMsg);
syncClientRequestDestroy(pMsg); syncClientRequestDestroy(pMsg);
......
...@@ -162,7 +162,7 @@ SRpcMsg *step0() { ...@@ -162,7 +162,7 @@ SRpcMsg *step0() {
} }
SyncClientRequest *step1(const SRpcMsg *pMsg) { SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000);
return pRetMsg; return pRetMsg;
} }
...@@ -206,7 +206,7 @@ int main(int argc, char **argv) { ...@@ -206,7 +206,7 @@ int main(int argc, char **argv) {
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
SyncClientRequest *pSyncClientRequest = pMsg1; SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
......
...@@ -140,7 +140,7 @@ SRpcMsg *step0() { ...@@ -140,7 +140,7 @@ SRpcMsg *step0() {
} }
SyncClientRequest *step1(const SRpcMsg *pMsg) { SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000);
return pRetMsg; return pRetMsg;
} }
...@@ -181,7 +181,7 @@ int main(int argc, char **argv) { ...@@ -181,7 +181,7 @@ int main(int argc, char **argv) {
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
SyncClientRequest *pSyncClientRequest = pMsg1; SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册