syncTestTool.cpp 13.9 KB
Newer Older
M
Minghao Li 已提交
1
#include <gtest/gtest.h>
2
#include "syncTest.h"
M
Minghao Li 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

void logTest() {
  sTrace("--- sync log test: trace");
  sDebug("--- sync log test: debug");
  sInfo("--- sync log test: info");
  sWarn("--- sync log test: warn");
  sError("--- sync log test: error");
  sFatal("--- sync log test: fatal");
}

uint16_t    gPorts[] = {7000, 7001, 7002, 7003, 7004};
const char* gDir = "./syncTestTool";
int32_t     gVgId = 1234;
SyncIndex   gSnapshotLastApplyIndex;
SyncIndex   gSnapshotLastApplyTerm;
int         gIterTimes = 0;

M
Minghao Li 已提交
20 21 22
SyncIndex gFinishLastApplyIndex;
SyncIndex gFinishLastApplyTerm;

M
Minghao Li 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35
void init() {
  int code = walInit();
  assert(code == 0);

  code = syncInit();
  assert(code == 0);
}

void cleanup() { walCleanUp(); }

void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
  char logBuf[256] = {0};
  snprintf(logBuf, sizeof(logBuf),
36 37 38
           "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64
           ", term:%" PRIu64
           " "
S
Shengliang Guan 已提交
39
           "currentTerm:%" PRIu64 " \n",
40 41
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag,
           cbMeta.term, cbMeta.currentTerm);
M
Minghao Li 已提交
42 43 44 45 46 47
  syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}

void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
  char logBuf[256] = {0};
  snprintf(logBuf, sizeof(logBuf),
48 49 50
           "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64
           ", term:%" PRIu64
           " "
S
Shengliang Guan 已提交
51
           "currentTerm:%" PRIu64 " \n",
52 53
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag,
           cbMeta.term, cbMeta.currentTerm);
M
Minghao Li 已提交
54 55 56 57 58 59
  syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}

void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
  char logBuf[256] = {0};
  snprintf(logBuf, sizeof(logBuf),
60 61 62
           "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64
           ", term:%" PRIu64
           " "
S
Shengliang Guan 已提交
63
           "currentTerm:%" PRIu64 " \n",
64 65
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag,
           cbMeta.term, cbMeta.currentTerm);
M
Minghao Li 已提交
66 67 68 69 70 71 72 73 74 75
  syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}

int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex;
  pSnapshot->lastApplyTerm = gSnapshotLastApplyTerm;
  return 0;
}

76
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) {
M
Minghao Li 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
  *ppReader = (void*)0xABCD;
  char logBuf[256] = {0};
  snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
  sTrace("%s", logBuf);
  return 0;
}

int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) {
  char logBuf[256] = {0};
  snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopRead== pFsm:%p, pReader:%p", pFsm, pReader);
  sTrace("%s", logBuf);
  return 0;
}

int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) {
  static int readIter = 0;

  if (readIter == gIterTimes) {
    *len = 0;
    *ppBuf = NULL;
  } else if (readIter < gIterTimes) {
    *len = 20;
    *ppBuf = taosMemoryMalloc(*len);
    snprintf((char*)*ppBuf, *len, "data iter:%d", readIter);
  }

  char logBuf[256] = {0};
  snprintf(logBuf, sizeof(logBuf),
           "==callback== ==SnapshotDoRead== pFsm:%p, pReader:%p, *len:%d, *ppBuf:[%s], readIter:%d", pFsm, pReader,
           *len, (char*)(*ppBuf), readIter);
  sTrace("%s", logBuf);

  readIter++;
  return 0;
}

113
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) {
M
Minghao Li 已提交
114 115
  *ppWriter = (void*)0xCDEF;
  char logBuf[256] = {0};
M
Minghao Li 已提交
116

M
Minghao Li 已提交
117 118 119 120 121
  snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter);
  sTrace("%s", logBuf);
  return 0;
}

122
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot) {
M
Minghao Li 已提交
123
  if (isApply) {
M
Minghao Li 已提交
124 125
    gSnapshotLastApplyIndex = gFinishLastApplyIndex;
    gSnapshotLastApplyTerm = gFinishLastApplyTerm;
M
Minghao Li 已提交
126 127
  }

M
Minghao Li 已提交
128 129
  char logBuf[256] = {0};
  snprintf(logBuf, sizeof(logBuf),
130 131
           "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d, gSnapshotLastApplyIndex:%" PRId64
           ", "
S
Shengliang Guan 已提交
132
           "gSnapshotLastApplyTerm:%" PRId64,
M
Minghao Li 已提交
133 134 135
           pFsm, pWriter, isApply, gSnapshotLastApplyIndex, gSnapshotLastApplyTerm);
  sTrace("%s", logBuf);

M
Minghao Li 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148
  return 0;
}

int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) {
  char logBuf[256] = {0};
  snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotDoWrite== pFsm:%p, pWriter:%p, len:%d pBuf:[%s]", pFsm,
           pWriter, len, (char*)pBuf);
  sTrace("%s", logBuf);
  return 0;
}

void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); }

S
Shengliang Guan 已提交
149
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
M
Minghao Li 已提交
150
  char* s = syncCfg2Str(&(cbMeta->newCfg));
151 152
  sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64
         ", newCfg:%s",
S
Shengliang Guan 已提交
153
         cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term, s);
M
Minghao Li 已提交
154 155 156
  taosMemoryFree(s);
}

M
Minghao Li 已提交
157 158 159
void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
  char logBuf[256] = {0};
  snprintf(logBuf, sizeof(logBuf),
160 161 162
           "==callback== ==LeaderTransferCb== pFsm:%p, index:%" PRId64
           ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64
           " "
S
Shengliang Guan 已提交
163
           "currentTerm:%" PRIu64 " \n",
164 165
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag,
           cbMeta.term, cbMeta.currentTerm);
M
Minghao Li 已提交
166 167 168
  syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}

M
Minghao Li 已提交
169 170 171 172
SSyncFSM* createFsm() {
  SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
  memset(pFsm, 0, sizeof(*pFsm));

M
Minghao Li 已提交
173
#if 0
M
Minghao Li 已提交
174 175 176 177 178
  pFsm->FpCommitCb = CommitCb;
  pFsm->FpPreCommitCb = PreCommitCb;
  pFsm->FpRollBackCb = RollBackCb;

  pFsm->FpReConfigCb = ReConfigCb;
179
  pFsm->FpGetSnapshotInfo = GetSnapshotCb;
M
Minghao Li 已提交
180 181 182 183 184 185 186 187 188
  pFsm->FpRestoreFinishCb = RestoreFinishCb;

  pFsm->FpSnapshotStartRead = SnapshotStartRead;
  pFsm->FpSnapshotStopRead = SnapshotStopRead;
  pFsm->FpSnapshotDoRead = SnapshotDoRead;
  pFsm->FpSnapshotStartWrite = SnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = SnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = SnapshotDoWrite;

M
Minghao Li 已提交
189
  pFsm->FpLeaderTransferCb = LeaderTransferCb;
M
Minghao Li 已提交
190
#endif
M
Minghao Li 已提交
191

M
Minghao Li 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
  return pFsm;
}

SWal* createWal(char* path, int32_t vgId) {
  SWalCfg walCfg;
  memset(&walCfg, 0, sizeof(SWalCfg));
  walCfg.vgId = vgId;
  walCfg.fsyncPeriod = 1000;
  walCfg.retentionPeriod = 1000;
  walCfg.rollPeriod = 1000;
  walCfg.retentionSize = 1000;
  walCfg.segSize = 1000;
  walCfg.level = TAOS_WAL_FSYNC;
  SWal* pWal = walOpen(path, &walCfg);
  assert(pWal != NULL);
  return pWal;
}

M
Minghao Li 已提交
210
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy,
M
Minghao Li 已提交
211
                       ESyncStrategy enableSnapshot) {
M
Minghao Li 已提交
212 213 214
  SSyncInfo syncInfo;
  syncInfo.vgId = vgId;
  syncInfo.msgcb = &gSyncIO->msgcb;
S
Shengliang Guan 已提交
215 216
  syncInfo.syncSendMSg = syncIOSendMsg;
  syncInfo.syncEqMsg = syncIOEqMsg;
M
Minghao Li 已提交
217 218 219 220
  syncInfo.pFsm = createFsm();
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
  syncInfo.pWal = pWal;
  syncInfo.isStandBy = isStandBy;
M
Minghao Li 已提交
221
  syncInfo.snapshotStrategy = enableSnapshot;
M
Minghao Li 已提交
222 223 224

  SSyncCfg* pCfg = &syncInfo.syncCfg;

M
Minghao Li 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237
#if 0
  {
    pCfg->myIndex = myIndex;
    pCfg->replicaNum = replicaNum;

    for (int i = 0; i < replicaNum; ++i) {
      pCfg->nodeInfo[i].nodePort = gPorts[i];
      taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn);
      // snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
    }
  }
#endif

M
Minghao Li 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
  if (isStandBy) {
    pCfg->myIndex = 0;
    pCfg->replicaNum = 1;
    pCfg->nodeInfo[0].nodePort = gPorts[myIndex];
    taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);

  } else {
    pCfg->myIndex = myIndex;
    pCfg->replicaNum = replicaNum;

    for (int i = 0; i < replicaNum; ++i) {
      pCfg->nodeInfo[i].nodePort = gPorts[i];
      taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn);
      // snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
    }
  }

  int64_t rid = syncOpen(&syncInfo);
  assert(rid > 0);

  SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
  assert(pSyncNode != NULL);
  gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
  gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
  gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
  gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
  gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
  gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
  gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
  gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
M
Minghao Li 已提交
268 269
  gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot;
  gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply;
M
Minghao Li 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294

  gSyncIO->pSyncNode = pSyncNode;
  syncNodeRelease(pSyncNode);

  return rid;
}

void configChange(int64_t rid, int32_t newReplicaNum, int32_t myIndex) {
  SSyncCfg syncCfg;

  syncCfg.myIndex = myIndex;
  syncCfg.replicaNum = newReplicaNum;

  for (int i = 0; i < newReplicaNum; ++i) {
    syncCfg.nodeInfo[i].nodePort = gPorts[i];
    taosGetFqdn(syncCfg.nodeInfo[i].nodeFqdn);
  }

  syncReconfig(rid, &syncCfg);
}

void usage(char* exe) {
  printf(
      "usage: %s  replicaNum(1-5)  myIndex(0-..)  enableSnapshot(0/1)  lastApplyIndex(>=-1)  lastApplyTerm(>=0)  "
      "writeRecordNum(>=0)  "
M
Minghao Li 已提交
295 296
      "isStandBy(0/1)  isConfigChange(0-5)  iterTimes(>=0)  finishLastApplyIndex(>=-1)  finishLastApplyTerm(>=0) "
      "leaderTransfer(0/1) \n",
M
Minghao Li 已提交
297 298 299 300 301 302
      exe);
}

SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
  SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg));
  memset(pMsg, 0, sizeof(SRpcMsg));
303
  pMsg->msgType = TDMT_VND_SUBMIT;
M
Minghao Li 已提交
304 305
  pMsg->contLen = 256;
  pMsg->pCont = rpcMallocCont(pMsg->contLen);
M
Minghao Li 已提交
306
  snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count,
307
           taosGetTimestampMs());
M
Minghao Li 已提交
308 309 310 311 312 313
  return pMsg;
}

int main(int argc, char** argv) {
  sprintf(tsTempDir, "%s", ".");
  tsAsyncLog = 0;
M
Minghao Li 已提交
314
  sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR + DEBUG_DEBUG;
M
Minghao Li 已提交
315

M
Minghao Li 已提交
316
  if (argc != 13) {
M
Minghao Li 已提交
317 318 319 320
    usage(argv[0]);
    exit(-1);
  }

321 322 323 324 325 326 327 328 329 330 331 332
  int32_t       replicaNum = atoi(argv[1]);
  int32_t       myIndex = atoi(argv[2]);
  ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]);
  int32_t       lastApplyIndex = atoi(argv[4]);
  int32_t       lastApplyTerm = atoi(argv[5]);
  int32_t       writeRecordNum = atoi(argv[6]);
  bool          isStandBy = atoi(argv[7]);
  int32_t       isConfigChange = atoi(argv[8]);
  int32_t       iterTimes = atoi(argv[9]);
  int32_t       finishLastApplyIndex = atoi(argv[10]);
  int32_t       finishLastApplyTerm = atoi(argv[11]);
  int32_t       leaderTransfer = atoi(argv[12]);
M
Minghao Li 已提交
333

M
Minghao Li 已提交
334
  sInfo(
M
Minghao Li 已提交
335
      "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "
M
Minghao Li 已提交
336 337
      "isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d, "
      "leaderTransfer:%d",
M
Minghao Li 已提交
338
      replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange,
M
Minghao Li 已提交
339
      iterTimes, finishLastApplyIndex, finishLastApplyTerm, leaderTransfer);
M
Minghao Li 已提交
340 341 342

  // check parameter
  assert(replicaNum >= 1 && replicaNum <= 5);
343
  // assert(myIndex >= 0 && myIndex < replicaNum);
M
Minghao Li 已提交
344 345 346 347 348
  assert(lastApplyIndex >= -1);
  assert(lastApplyTerm >= 0);
  assert(writeRecordNum >= 0);
  assert(isConfigChange >= 0 && isConfigChange <= 5);
  assert(iterTimes >= 0);
M
Minghao Li 已提交
349 350
  assert(finishLastApplyIndex >= -1);
  assert(finishLastApplyTerm >= 0);
M
Minghao Li 已提交
351 352 353 354 355 356 357 358 359 360

  char logFile[256];
  snprintf(logFile, sizeof(logFile), "/tmp/%s-replicaNum%d-myIndex%d.log", gDir, replicaNum, myIndex);
  taosInitLog(logFile, 100);
  sTrace("logFile : %s", logFile);

  gSnapshotLastApplyIndex = lastApplyIndex;
  gSnapshotLastApplyTerm = lastApplyTerm;
  gIterTimes = iterTimes;

M
Minghao Li 已提交
361 362 363
  gFinishLastApplyIndex = finishLastApplyIndex;
  gFinishLastApplyTerm = finishLastApplyTerm;

M
Minghao Li 已提交
364 365 366 367 368 369 370 371
  init();
  int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]);
  assert(ret == 0);

  char walPath[128];
  snprintf(walPath, sizeof(walPath), "%s_wal_replica%d_index%d", gDir, replicaNum, myIndex);
  SWal* pWal = createWal(walPath, gVgId);

372
  int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir, isStandBy, enableSnapshot);
M
Minghao Li 已提交
373 374 375 376 377 378 379 380 381 382 383 384
  assert(rid > 0);
  syncStart(rid);

  SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
  assert(pSyncNode != NULL);

  if (isConfigChange > 0) {
    configChange(rid, isConfigChange, myIndex);
  }

  //---------------------------
  int32_t alreadySend = 0;
M
Minghao Li 已提交
385
  int32_t leaderTransferWait = 0;
M
Minghao Li 已提交
386 387 388
  while (1) {
    char* simpleStr = syncNode2SimpleStr(pSyncNode);

M
Minghao Li 已提交
389 390
    leaderTransferWait++;
    if (leaderTransferWait == 7) {
391 392 393 394
      if (leaderTransfer) {
        sTrace("begin leader transfer ...");
        int32_t ret = syncLeaderTransfer(rid);
      }
M
Minghao Li 已提交
395 396
    }

M
Minghao Li 已提交
397 398 399
    if (alreadySend < writeRecordNum) {
      SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
      int32_t  ret = syncPropose(rid, pRpcMsg, false);
M
Minghao Li 已提交
400
      if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
M
Minghao Li 已提交
401
        sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait);
M
Minghao Li 已提交
402 403
      } else {
        assert(ret == 0);
M
Minghao Li 已提交
404
        sTrace("%s value%d write ok, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait);
M
Minghao Li 已提交
405 406 407 408 409 410
      }
      alreadySend++;

      rpcFreeCont(pRpcMsg->pCont);
      taosMemoryFree(pRpcMsg);
    } else {
M
Minghao Li 已提交
411
      sTrace("%s, leaderTransferWait:%d", simpleStr, leaderTransferWait);
M
Minghao Li 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
    }

    taosMsleep(1000);
    taosMemoryFree(simpleStr);
    taosMsleep(1000);
  }

  syncNodeRelease(pSyncNode);
  syncStop(rid);
  walClose(pWal);
  syncIOStop();
  cleanup();
  taosCloseLog();
  return 0;
}