From 3e65caccaa07ed9b76232bc7147fcf616318c3bd Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 14 May 2022 19:54:18 +0800 Subject: [PATCH] enh(sync): add syncStartStandBy --- source/libs/sync/src/syncIO.c | 2 +- .../libs/sync/test/syncConfigChangeTest.cpp | 39 +++++++++++++++---- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 209bd25c2c..1c5bb2914c 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -15,12 +15,12 @@ #include "syncIO.h" #include +#include "os.h" #include "syncMessage.h" #include "syncUtil.h" #include "tglobal.h" #include "ttimer.h" #include "tutil.h" -#include "os.h" SSyncIO *gSyncIO = NULL; diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index e91c6d33fb..4455d25c59 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -1,11 +1,11 @@ #include #include +#include "os.h" #include "syncEnv.h" #include "syncIO.h" #include "syncInt.h" #include "syncUtil.h" #include "wal.h" -#include "os.h" void logTest() { sTrace("--- sync log test: trace"); @@ -114,7 +114,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pCfg->myIndex = 0; pCfg->replicaNum = 1; pCfg->nodeInfo[0].nodePort = gPorts[myIndex]; - taosGetFqdn(pCfg->nodeInfo[myIndex].nodeFqdn); + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); } else { pCfg->myIndex = myIndex; @@ -148,7 +148,23 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* return rid; } -void usage(char* exe) { printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy \n", exe); } +void configChange(int64_t rid, int32_t replicaNum, int32_t myIndex) { + SSyncCfg syncCfg; + + syncCfg.myIndex = myIndex; + syncCfg.replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + syncCfg.nodeInfo[i].nodePort = gPorts[i]; + taosGetFqdn(syncCfg.nodeInfo[i].nodeFqdn); + } + + syncReconfig(rid, &syncCfg); +} + +void usage(char* exe) { + printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy isConfigChange \n", exe); +} SRpcMsg* createRpcMsg(int i, int count, int myIndex) { SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg)); @@ -163,7 +179,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { int main(int argc, char** argv) { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; - if (argc != 6) { + if (argc != 7) { usage(argv[0]); exit(-1); } @@ -173,12 +189,15 @@ int main(int argc, char** argv) { int32_t lastApplyIndex = atoi(argv[3]); int32_t writeRecordNum = atoi(argv[4]); bool isStandBy = atoi(argv[5]); + bool isConfigChange = atoi(argv[6]); gSnapshotLastApplyIndex = lastApplyIndex; - assert(replicaNum >= 1 && replicaNum <= 5); - assert(myIndex >= 0 && myIndex < replicaNum); - assert(lastApplyIndex >= -1); - assert(writeRecordNum >= 0); + if (!isStandBy) { + assert(replicaNum >= 1 && replicaNum <= 5); + assert(myIndex >= 0 && myIndex < replicaNum); + assert(lastApplyIndex >= -1); + assert(writeRecordNum >= 0); + } init(); int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); @@ -200,6 +219,10 @@ int main(int argc, char** argv) { SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); assert(pSyncNode != NULL); + if (isConfigChange) { + configChange(rid, replicaNum, myIndex); + } + //--------------------------- int32_t alreadySend = 0; while (1) { -- GitLab