提交 086ec29c 编写于 作者: M Minghao Li

refactor(sync): add SyncClientRequestBatch

上级 d4ab1346
......@@ -191,12 +191,12 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg);
typedef struct SyncClientRequest {
uint32_t bytes;
int32_t vgId;
uint32_t msgType; // SyncClientRequest msgType
uint32_t originalRpcType; // user RpcMsg msgType
uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST
uint32_t originalRpcType; // origin RpcMsg msgType
uint64_t seqNum;
bool isWeak;
uint32_t dataLen; // user RpcMsg.contLen
char data[]; // user RpcMsg.pCont
uint32_t dataLen; // origin RpcMsg.contLen
char data[]; // origin RpcMsg.pCont
} SyncClientRequest;
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
......@@ -220,11 +220,6 @@ void syncClientRequestLog(const SyncClientRequest* pMsg);
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
// ---------------------------------------------
typedef struct SOffsetAndContLen {
int32_t offset;
int32_t contLen;
} SOffsetAndContLen;
typedef struct SRaftMeta {
uint64_t seqNum;
bool isWeak;
......@@ -232,20 +227,33 @@ typedef struct SRaftMeta {
// block1:
// block2: SRaftMeta array
// block3: rpc msg array (with pCont)
// block3: rpc msg array (with pCont pointer)
typedef struct SyncClientRequestBatch {
uint32_t bytes;
int32_t vgId;
uint32_t msgType; // SyncClientRequestBatch msgType
uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH
uint32_t dataCount;
uint32_t dataLen; // user RpcMsg.contLen
char data[]; // user RpcMsg.pCont
uint32_t dataLen;
char data[]; // block2, block3
} SyncClientRequestBatch;
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId);
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg);
SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg);
SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg);
SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg);
cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg);
char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg);
// for debug ----------------------
void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg);
void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg);
void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg);
void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg);
// ---------------------------------------------
typedef struct SyncClientRequestReply {
......@@ -318,12 +326,15 @@ void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
// ---------------------------------------------
// data: entry
typedef struct SyncAppendEntries {
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
SRaftId srcId;
SRaftId destId;
// private data
SyncTerm term;
SyncIndex prevLogIndex;
......@@ -354,18 +365,14 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
// ---------------------------------------------
// define ahead
/*
typedef struct SOffsetAndContLen {
int32_t offset;
int32_t contLen;
} SOffsetAndContLen;
*/
// block1: SOffsetAndContLen
// block2: SOffsetAndContLen Array
// block3: SRpcMsg Array
// block4: SRpcMsg pCont Array
// data:
// block1: SOffsetAndContLen Array
// block2: entry Array
typedef struct SyncAppendEntriesBatch {
uint32_t bytes;
......@@ -382,10 +389,12 @@ typedef struct SyncAppendEntriesBatch {
SyncTerm privateTerm;
int32_t dataCount;
uint32_t dataLen;
char data[];
char data[]; // block1, block2
} SyncAppendEntriesBatch;
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId);
// SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId);
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId);
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);
......
......@@ -29,19 +29,20 @@ extern "C" {
typedef struct SSyncRaftEntry {
uint32_t bytes;
uint32_t msgType; // SyncClientRequest msgType
uint32_t originalRpcType; // user RpcMsg msgType
uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST
uint32_t originalRpcType; // origin RpcMsg msgType
uint64_t seqNum;
bool isWeak;
SyncTerm term;
SyncIndex index;
uint32_t dataLen; // user RpcMsg.contLen
char data[]; // user RpcMsg.pCont
uint32_t dataLen; // origin RpcMsg.contLen
char data[]; // origin RpcMsg.pCont
} SSyncRaftEntry;
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
SSyncRaftEntry* syncEntryBuild4(SRpcMsg* pOriginalMsg, SyncTerm term, SyncIndex index);
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId);
void syncEntryDestory(SSyncRaftEntry* pEntry);
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
......
......@@ -40,8 +40,8 @@ typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
int32_t ack;
void *pReader;
void *pCurrentBlock;
void * pReader;
void * pCurrentBlock;
int32_t blockLen;
SSnapshot snapshot;
SSyncCfg lastConfig;
......@@ -62,14 +62,14 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
//---------------------------------------------------
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
void *pWriter;
void * pWriter;
SyncTerm term;
SyncTerm privateTerm;
SSnapshot snapshot;
......@@ -85,8 +85,8 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
//---------------------------------------------------
// on message
......
......@@ -694,6 +694,8 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
return 0;
}
static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEntriesBatch* pMsg) { return true; }
// really pre log match
// prevLogIndex == -1
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
......@@ -844,8 +846,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
} while (0);
// calculate logOK here, before will coredump, due to fake match
// bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
bool logOK = true;
bool logOK = syncNodeOnAppendEntriesBatchLogOK(ths, pMsg);
// not match
//
......@@ -866,8 +867,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
if (condition) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, not match, pre-index:%ld, pre-term:%lu, datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex,
pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
// prepare response msg
......@@ -914,7 +916,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
if (hasExtraEntries) {
// make log same, rollback deleted entries
// code = syncNodeMakeLogSame(ths, pMsg);
code = syncNodeMakeLogSame2(ths, pMsg);
ASSERT(code == 0);
}
......@@ -926,7 +928,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// append entry batch
for (int32_t i = 0; i < retArrSize; ++i) {
SSyncRaftEntry* pAppendEntry = syncEntryBuild(1234);
// SSyncRaftEntry* pAppendEntry = syncEntryBuild4(&rpcMsgArr[i], );
SSyncRaftEntry* pAppendEntry;
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
return -1;
......
......@@ -996,7 +996,135 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet
return pMsg;
}
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {}
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pSyncMsg->msgType;
pRpcMsg->contLen = pSyncMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
memcpy(pRpcMsg->pCont, pSyncMsg, pRpcMsg->contLen);
}
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg) {
if (pMsg != NULL) {
int32_t arrSize = pMsg->dataCount;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
for (int i = 0; i < arrSize; ++i) {
if (msgArr[i].pCont != NULL) {
rpcFreeCont(msgArr[i].pCont);
}
}
taosMemoryFree(pMsg);
}
}
SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg) {
SRaftMeta* raftMetaArr = (SRaftMeta*)(pSyncMsg->data);
return raftMetaArr;
}
SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg) {
int32_t arrSize = pSyncMsg->dataCount;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pSyncMsg->data) + raftMetaArrayLen);
return msgArr;
}
SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg) {
SyncClientRequestBatch* pSyncMsg = taosMemoryMalloc(pRpcMsg->contLen);
ASSERT(pSyncMsg != NULL);
memcpy(pSyncMsg, pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pRpcMsg->contLen == pSyncMsg->bytes);
return pSyncMsg;
}
cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount);
SRaftMeta* metaArr = syncClientRequestBatchMetaArr(pMsg);
SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pMsg);
cJSON* pMetaArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
cJSON* pMeta = cJSON_CreateObject();
cJSON_AddNumberToObject(pMeta, "seqNum", metaArr[i].seqNum);
cJSON_AddNumberToObject(pMeta, "isWeak", metaArr[i].isWeak);
cJSON_AddItemToArray(pMetaArr, pMeta);
}
cJSON* pMsgArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
cJSON* pRpcMsgJson = syncRpcMsg2Json(&msgArr[i]);
cJSON_AddItemToArray(pMsgArr, pRpcMsgJson);
}
char* s;
s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncClientRequestBatch", pRoot);
return pJson;
}
char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg) {
cJSON* pJson = syncClientRequestBatch2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg) {
char* serialized = syncClientRequestBatch2Str(pMsg);
printf("syncClientRequestBatchPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg) {
char* serialized = syncClientRequestBatch2Str(pMsg);
printf("syncClientRequestBatchPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg) {
char* serialized = syncClientRequestBatch2Str(pMsg);
sTrace("syncClientRequestBatchLog | len:%lu | %s", strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncClientRequestBatch2Str(pMsg);
sTraceLong("syncClientRequestBatchLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ---- message process SyncRequestVote----
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
......@@ -1475,6 +1603,7 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
// block3: SRpcMsg Array
// block4: SRpcMsg pCont Array
/*
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId) {
ASSERT(rpcMsgArr != NULL);
ASSERT(arrSize > 0);
......@@ -1521,6 +1650,15 @@ SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t
return pMsg;
}
*/
// block1: SOffsetAndContLen
// block2: SOffsetAndContLen Array
// block3: entry Array
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) {
return NULL;
}
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) {
if (pMsg != NULL) {
......
......@@ -50,6 +50,22 @@ SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncInde
return pEntry;
}
SSyncRaftEntry* syncEntryBuild4(SRpcMsg* pOriginalMsg, SyncTerm term, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild(pOriginalMsg->contLen);
ASSERT(pEntry != NULL);
pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
pEntry->originalRpcType = pOriginalMsg->msgType;
pEntry->seqNum = 0;
pEntry->isWeak = 0;
pEntry->term = term;
pEntry->index = index;
pEntry->dataLen = pOriginalMsg->contLen;
memcpy(pEntry->data, pOriginalMsg->pCont, pOriginalMsg->contLen);
return pEntry;
}
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) {
// init rpcMsg
SMsgHead head;
......
......@@ -162,7 +162,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
}
}
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, getCount, pSyncNode->vgId);
// SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, getCount, pSyncNode->vgId);
SyncAppendEntriesBatch* pMsg = NULL;
ASSERT(pMsg != NULL);
// prepare msg
......
......@@ -353,14 +353,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64];
......@@ -612,7 +612,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
cJSON * pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
......@@ -645,14 +645,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId;
char host[128];
......
......@@ -24,6 +24,7 @@ add_executable(syncAppendEntriesTest "")
add_executable(syncAppendEntriesBatchTest "")
add_executable(syncAppendEntriesReplyTest "")
add_executable(syncClientRequestTest "")
add_executable(syncClientRequestBatchTest "")
add_executable(syncTimeoutTest "")
add_executable(syncPingTest "")
add_executable(syncPingReplyTest "")
......@@ -159,6 +160,10 @@ target_sources(syncClientRequestTest
PRIVATE
"syncClientRequestTest.cpp"
)
target_sources(syncClientRequestBatchTest
PRIVATE
"syncClientRequestBatchTest.cpp"
)
target_sources(syncTimeoutTest
PRIVATE
"syncTimeoutTest.cpp"
......@@ -407,6 +412,11 @@ target_include_directories(syncClientRequestTest
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncClientRequestBatchTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncTimeoutTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
......@@ -658,6 +668,10 @@ target_link_libraries(syncClientRequestTest
sync
gtest_main
)
target_link_libraries(syncClientRequestBatchTest
sync
gtest_main
)
target_link_libraries(syncTimeoutTest
sync
gtest_main
......
......@@ -15,6 +15,7 @@ void logTest() {
sFatal("--- sync log test: fatal");
}
/*
SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) {
SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg));
memset(pRpcMsg, 0, sizeof(SRpcMsg));
......@@ -67,7 +68,6 @@ void test1() {
syncAppendEntriesBatchDestroy(pMsg);
}
/*
void test2() {
SyncAppendEntries *pMsg = createMsg();
uint32_t len = pMsg->bytes;
......@@ -126,9 +126,8 @@ int main() {
sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest();
test1();
/*
test1();
test2();
test3();
test4();
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) {
SyncPing *pSyncMsg = syncPingBuild(20);
snprintf(pSyncMsg->data, pSyncMsg->dataLen, "value_%d", i);
SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg));
memset(pRpcMsg, 0, sizeof(SRpcMsg));
pRpcMsg->code = 10 * i;
syncPing2RpcMsg(pSyncMsg, pRpcMsg);
syncPingDestroy(pSyncMsg);
return pRpcMsg;
}
SyncClientRequestBatch *createMsg() {
SRpcMsg rpcMsgArr[5];
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
for (int32_t i = 0; i < 5; ++i) {
SRpcMsg *pRpcMsg = createRpcMsg(i, 20);
rpcMsgArr[i] = *pRpcMsg;
taosMemoryFree(pRpcMsg);
}
SRaftMeta raftArr[5];
memset(raftArr, 0, sizeof(raftArr));
for (int32_t i = 0; i < 5; ++i) {
raftArr[i].seqNum = i * 10;
raftArr[i].isWeak = i % 2;
}
SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgArr, raftArr, 5, 1234);
return pMsg;
}
void test1() {
SyncClientRequestBatch *pMsg = createMsg();
syncClientRequestBatchLog2((char *)"==test1==", pMsg);
syncClientRequestBatchDestroyDeep(pMsg);
}
/*
void test2() {
SyncClientRequest *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)taosMemoryMalloc(len);
syncClientRequestSerialize(pMsg, serialized, len);
SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen);
syncClientRequestDeserialize(serialized, len, pMsg2);
syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2);
taosMemoryFree(serialized);
syncClientRequestDestroy(pMsg);
syncClientRequestDestroy(pMsg2);
}
void test3() {
SyncClientRequest *pMsg = createMsg();
uint32_t len;
char * serialized = syncClientRequestSerialize2(pMsg, &len);
SyncClientRequest *pMsg2 = syncClientRequestDeserialize2(serialized, len);
syncClientRequestLog2((char *)"test3: syncClientRequestSerialize3 -> syncClientRequestDeserialize2 ", pMsg2);
taosMemoryFree(serialized);
syncClientRequestDestroy(pMsg);
syncClientRequestDestroy(pMsg2);
}
void test4() {
SyncClientRequest *pMsg = createMsg();
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(rpcMsg.contLen);
syncClientRequestFromRpcMsg(&rpcMsg, pMsg2);
syncClientRequestLog2((char *)"test4: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncClientRequestDestroy(pMsg);
syncClientRequestDestroy(pMsg2);
}
void test5() {
SyncClientRequest *pMsg = createMsg();
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg);
syncClientRequestLog2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncClientRequestDestroy(pMsg);
syncClientRequestDestroy(pMsg2);
}
*/
int main() {
gRaftDetailLog = true;
tsAsyncLog = 0;
sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest();
test1();
/*
test2();
test3();
test4();
test5();
*/
return 0;
}
......@@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0;
}
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) {
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) {
*ppReader = (void*)0xABCD;
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
......
......@@ -25,7 +25,7 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; }
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { return 0; }
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) { return 0; }
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
......
......@@ -74,7 +74,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0;
}
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) {
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) {
*ppReader = (void*)0xABCD;
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
......
......@@ -96,8 +96,8 @@ typedef void* queue[2];
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
#define TRANS_RETRY_INTERVAL 200 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx;
......@@ -180,18 +180,18 @@ typedef enum { Normal, Quit, Release, Register, Update } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define rpcIsReq(type) (type & 1U)
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
#define transLabel(trans) ((STrans*)trans)->label
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册