提交 eea343c5 编写于 作者: M Minghao Li

enh(sync): add syncTestTool

上级 44994201
......@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
int32_t ack;
void *pReader;
void *pCurrentBlock;
void * pReader;
void * pCurrentBlock;
int32_t blockLen;
SSnapshot snapshot;
int64_t sendingMS;
......@@ -56,14 +56,14 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
void *pWriter;
void * pWriter;
SyncTerm term;
SSyncNode *pSyncNode;
......@@ -74,8 +74,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);
......
......@@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
static void * syncIOConsumerFunc(void *param);
static void *syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
......@@ -90,8 +90,10 @@ int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t ret = 0;
char logBuf[128] = {0};
syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg);
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncIOEqMsg== msgType:%d", pMsg->msgType);
syncRpcMsgLog2(logBuf, pMsg);
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
......@@ -240,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static void *syncIOConsumerFunc(void *param) {
SSyncIO * io = param;
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
SRpcMsg *pRpcMsg, rpcMsg;
qall = taosAllocateQall();
while (1) {
......@@ -255,7 +257,7 @@ static void *syncIOConsumerFunc(void *param) {
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "==syncIOConsumerFunc== msgType:%d", pRpcMsg->msgType);
snprintf(logBuf, sizeof(logBuf), "==syncIOConsumMsg== msgType:%d", pRpcMsg->msgType);
syncRpcMsgLog2(logBuf, pRpcMsg);
// use switch case instead of if else
......
......@@ -284,7 +284,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -398,7 +398,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -43,6 +43,7 @@ add_executable(syncSnapshotSendTest "")
add_executable(syncSnapshotRspTest "")
add_executable(syncSnapshotSenderTest "")
add_executable(syncSnapshotReceiverTest "")
add_executable(syncTestTool "")
target_sources(syncTest
......@@ -225,6 +226,10 @@ target_sources(syncSnapshotReceiverTest
PRIVATE
"syncSnapshotReceiverTest.cpp"
)
target_sources(syncTestTool
PRIVATE
"syncTestTool.cpp"
)
target_include_directories(syncTest
......@@ -452,6 +457,11 @@ target_include_directories(syncSnapshotReceiverTest
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncTestTool
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -634,6 +644,10 @@ target_link_libraries(syncSnapshotReceiverTest
sync
gtest_main
)
target_link_libraries(syncTestTool
sync
gtest_main
)
enable_testing()
......
......@@ -50,14 +50,16 @@ void test4() {
}
int main(int argc, char** argv) {
// taosInitLog("tmp/syncTest.log", 100);
taosInitLog("/tmp/syncTest.log", 100);
tsAsyncLog = 0;
sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR;
test1();
test2();
test3();
test4();
/*
if (argc == 2) {
bool bTaosDirExist = taosDirExist(argv[1]);
printf("%s bTaosDirExist:%d \n", argv[1], bTaosDirExist);
......@@ -65,7 +67,8 @@ int main(int argc, char** argv) {
bool bTaosCheckExistFile = taosCheckExistFile(argv[1]);
printf("%s bTaosCheckExistFile:%d \n", argv[1], bTaosCheckExistFile);
}
*/
// taosCloseLog();
taosCloseLog();
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "os.h"
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncUtil.h"
#include "wal.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t gPorts[] = {7000, 7001, 7002, 7003, 7004};
const char* gDir = "./syncTestTool";
int32_t gVgId = 1234;
SyncIndex gSnapshotLastApplyIndex;
SyncIndex gSnapshotLastApplyTerm;
int gIterTimes = 0;
void init() {
int code = walInit();
assert(code == 0);
code = syncInit();
assert(code == 0);
}
void cleanup() { walCleanUp(); }
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, flag:%lu, term:%lu "
"currentTerm:%lu \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
cbMeta.flag, cbMeta.term, cbMeta.currentTerm);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, flag:%lu, term:%lu "
"currentTerm:%lu \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
cbMeta.flag, cbMeta.term, cbMeta.currentTerm);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}
void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, flag:%lu, term:%lu "
"currentTerm:%lu \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
cbMeta.flag, cbMeta.term, cbMeta.currentTerm);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}
int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex;
pSnapshot->lastApplyTerm = gSnapshotLastApplyTerm;
return 0;
}
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) {
*ppReader = (void*)0xABCD;
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
sTrace("%s", logBuf);
return 0;
}
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopRead== pFsm:%p, pReader:%p", pFsm, pReader);
sTrace("%s", logBuf);
return 0;
}
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) {
static int readIter = 0;
if (readIter == gIterTimes) {
*len = 0;
*ppBuf = NULL;
} else if (readIter < gIterTimes) {
*len = 20;
*ppBuf = taosMemoryMalloc(*len);
snprintf((char*)*ppBuf, *len, "data iter:%d", readIter);
}
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"==callback== ==SnapshotDoRead== pFsm:%p, pReader:%p, *len:%d, *ppBuf:[%s], readIter:%d", pFsm, pReader,
*len, (char*)(*ppBuf), readIter);
sTrace("%s", logBuf);
readIter++;
return 0;
}
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) {
*ppWriter = (void*)0xCDEF;
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter);
sTrace("%s", logBuf);
return 0;
}
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d", pFsm, pWriter,
isApply);
sTrace("%s", logBuf);
if (isApply) {
gSnapshotLastApplyIndex = 10;
gSnapshotLastApplyTerm = 1;
}
return 0;
}
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotDoWrite== pFsm:%p, pWriter:%p, len:%d pBuf:[%s]", pFsm,
pWriter, len, (char*)pBuf);
sTrace("%s", logBuf);
return 0;
}
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); }
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
char* s = syncCfg2Str(&newCfg);
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s);
taosMemoryFree(s);
}
SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
memset(pFsm, 0, sizeof(*pFsm));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
pFsm->FpReConfigCb = ReConfigCb;
pFsm->FpGetSnapshot = GetSnapshotCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpSnapshotStartRead = SnapshotStartRead;
pFsm->FpSnapshotStopRead = SnapshotStopRead;
pFsm->FpSnapshotDoRead = SnapshotDoRead;
pFsm->FpSnapshotStartWrite = SnapshotStartWrite;
pFsm->FpSnapshotStopWrite = SnapshotStopWrite;
pFsm->FpSnapshotDoWrite = SnapshotDoWrite;
return pFsm;
}
SWal* createWal(char* path, int32_t vgId) {
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
SWal* pWal = walOpen(path, &walCfg);
assert(pWal != NULL);
return pWal;
}
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) {
SSyncInfo syncInfo;
syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal;
syncInfo.isStandBy = isStandBy;
syncInfo.snapshotEnable = true;
SSyncCfg* pCfg = &syncInfo.syncCfg;
if (isStandBy) {
pCfg->myIndex = 0;
pCfg->replicaNum = 1;
pCfg->nodeInfo[0].nodePort = gPorts[myIndex];
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
} else {
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = gPorts[i];
taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn);
// snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
}
}
int64_t rid = syncOpen(&syncInfo);
assert(rid > 0);
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncSnapshotSend = pSyncNode->FpOnSnapshotSend;
gSyncIO->FpOnSyncSnapshotRsp = pSyncNode->FpOnSnapshotRsp;
gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
return rid;
}
void configChange(int64_t rid, int32_t newReplicaNum, int32_t myIndex) {
SSyncCfg syncCfg;
syncCfg.myIndex = myIndex;
syncCfg.replicaNum = newReplicaNum;
for (int i = 0; i < newReplicaNum; ++i) {
syncCfg.nodeInfo[i].nodePort = gPorts[i];
taosGetFqdn(syncCfg.nodeInfo[i].nodeFqdn);
}
syncReconfig(rid, &syncCfg);
}
void usage(char* exe) {
printf(
"usage: %s replicaNum(1-5) myIndex(0-..) enableSnapshot(0/1) lastApplyIndex(>=-1) lastApplyTerm(>=0) "
"writeRecordNum(>=0) "
"isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) \n",
exe);
}
SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 256;
pMsg->pCont = rpcMallocCont(pMsg->contLen);
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs());
return pMsg;
}
int main(int argc, char** argv) {
sprintf(tsTempDir, "%s", ".");
tsAsyncLog = 0;
sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR;
if (argc != 10) {
usage(argv[0]);
exit(-1);
}
int32_t replicaNum = atoi(argv[1]);
int32_t myIndex = atoi(argv[2]);
bool enableSnapshot = atoi(argv[3]);
int32_t lastApplyIndex = atoi(argv[4]);
int32_t lastApplyTerm = atoi(argv[5]);
int32_t writeRecordNum = atoi(argv[6]);
bool isStandBy = atoi(argv[7]);
bool isConfigChange = atoi(argv[8]);
int32_t iterTimes = atoi(argv[9]);
sTrace(
"args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "
"isStandBy:%d, isConfigChange:%d, iterTimes:%d",
replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange,
iterTimes);
// check parameter
assert(replicaNum >= 1 && replicaNum <= 5);
assert(myIndex >= 0 && myIndex < replicaNum);
assert(lastApplyIndex >= -1);
assert(lastApplyTerm >= 0);
assert(writeRecordNum >= 0);
assert(isConfigChange >= 0 && isConfigChange <= 5);
assert(iterTimes >= 0);
char logFile[256];
snprintf(logFile, sizeof(logFile), "/tmp/%s-replicaNum%d-myIndex%d.log", gDir, replicaNum, myIndex);
taosInitLog(logFile, 100);
sTrace("logFile : %s", logFile);
gSnapshotLastApplyIndex = lastApplyIndex;
gSnapshotLastApplyTerm = lastApplyTerm;
gIterTimes = iterTimes;
init();
int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]);
assert(ret == 0);
char walPath[128];
snprintf(walPath, sizeof(walPath), "%s_wal_replica%d_index%d", gDir, replicaNum, myIndex);
SWal* pWal = createWal(walPath, gVgId);
int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir, isStandBy);
assert(rid > 0);
syncStart(rid);
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
if (isConfigChange > 0) {
configChange(rid, isConfigChange, myIndex);
}
//---------------------------
int32_t alreadySend = 0;
while (1) {
char* simpleStr = syncNode2SimpleStr(pSyncNode);
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
sTrace("%s value%d write not leader", simpleStr, alreadySend);
} else {
assert(ret == 0);
sTrace("%s value%d write ok", simpleStr, alreadySend);
}
alreadySend++;
rpcFreeCont(pRpcMsg->pCont);
taosMemoryFree(pRpcMsg);
} else {
sTrace("%s", simpleStr);
}
taosMsleep(1000);
taosMemoryFree(simpleStr);
taosMsleep(1000);
}
syncNodeRelease(pSyncNode);
syncStop(rid);
walClose(pWal);
syncIOStop();
cleanup();
taosCloseLog();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册