提交 3e65cacc 编写于 作者: M Minghao Li

enh(sync): add syncStartStandBy

上级 a2d43fb9
...@@ -15,12 +15,12 @@ ...@@ -15,12 +15,12 @@
#include "syncIO.h" #include "syncIO.h"
#include <tdatablock.h> #include <tdatablock.h>
#include "os.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "tglobal.h" #include "tglobal.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "os.h"
SSyncIO *gSyncIO = NULL; SSyncIO *gSyncIO = NULL;
......
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <stdio.h> #include <stdio.h>
#include "os.h"
#include "syncEnv.h" #include "syncEnv.h"
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "wal.h" #include "wal.h"
#include "os.h"
void logTest() { void logTest() {
sTrace("--- sync log test: trace"); sTrace("--- sync log test: trace");
...@@ -114,7 +114,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* ...@@ -114,7 +114,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
pCfg->myIndex = 0; pCfg->myIndex = 0;
pCfg->replicaNum = 1; pCfg->replicaNum = 1;
pCfg->nodeInfo[0].nodePort = gPorts[myIndex]; pCfg->nodeInfo[0].nodePort = gPorts[myIndex];
taosGetFqdn(pCfg->nodeInfo[myIndex].nodeFqdn); taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
} else { } else {
pCfg->myIndex = myIndex; pCfg->myIndex = myIndex;
...@@ -148,7 +148,23 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* ...@@ -148,7 +148,23 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
return rid; return rid;
} }
void usage(char* exe) { printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy \n", exe); } void configChange(int64_t rid, int32_t replicaNum, int32_t myIndex) {
SSyncCfg syncCfg;
syncCfg.myIndex = myIndex;
syncCfg.replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
syncCfg.nodeInfo[i].nodePort = gPorts[i];
taosGetFqdn(syncCfg.nodeInfo[i].nodeFqdn);
}
syncReconfig(rid, &syncCfg);
}
void usage(char* exe) {
printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy isConfigChange \n", exe);
}
SRpcMsg* createRpcMsg(int i, int count, int myIndex) { SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg)); SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg));
...@@ -163,7 +179,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { ...@@ -163,7 +179,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
if (argc != 6) { if (argc != 7) {
usage(argv[0]); usage(argv[0]);
exit(-1); exit(-1);
} }
...@@ -173,12 +189,15 @@ int main(int argc, char** argv) { ...@@ -173,12 +189,15 @@ int main(int argc, char** argv) {
int32_t lastApplyIndex = atoi(argv[3]); int32_t lastApplyIndex = atoi(argv[3]);
int32_t writeRecordNum = atoi(argv[4]); int32_t writeRecordNum = atoi(argv[4]);
bool isStandBy = atoi(argv[5]); bool isStandBy = atoi(argv[5]);
bool isConfigChange = atoi(argv[6]);
gSnapshotLastApplyIndex = lastApplyIndex; gSnapshotLastApplyIndex = lastApplyIndex;
if (!isStandBy) {
assert(replicaNum >= 1 && replicaNum <= 5); assert(replicaNum >= 1 && replicaNum <= 5);
assert(myIndex >= 0 && myIndex < replicaNum); assert(myIndex >= 0 && myIndex < replicaNum);
assert(lastApplyIndex >= -1); assert(lastApplyIndex >= -1);
assert(writeRecordNum >= 0); assert(writeRecordNum >= 0);
}
init(); init();
int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]);
...@@ -200,6 +219,10 @@ int main(int argc, char** argv) { ...@@ -200,6 +219,10 @@ int main(int argc, char** argv) {
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
if (isConfigChange) {
configChange(rid, replicaNum, myIndex);
}
//--------------------------- //---------------------------
int32_t alreadySend = 0; int32_t alreadySend = 0;
while (1) { while (1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册