diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index e7086626d6f11c07c8389322a40761bb66a740a7..2ca77fa404d31b64fecc98521fe6e3697bd39b4f 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -172,7 +172,7 @@ void syncCleanUp() { int64_t syncStart(const SSyncInfo *pInfo) { const SSyncCfg *pCfg = &pInfo->syncCfg; - SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1); + SSyncNode *pNode = calloc(sizeof(SSyncNode), 1); if (pNode == NULL) { sError("no memory to allocate syncNode"); terrno = TAOS_SYSTEM_ERROR(errno); @@ -207,8 +207,8 @@ int64_t syncStart(const SSyncInfo *pInfo) { const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); if (pNode->peerInfo[i] == NULL) { - sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId, pNodeInfo->nodeFqdn, - pNodeInfo->nodePort); + sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId, + pNodeInfo->nodeFqdn, pNodeInfo->nodePort); syncStop(pNode->rid); exit(1); } @@ -419,7 +419,7 @@ void syncRecover(int64_t rid) { pthread_mutex_lock(&(pNode->mutex)); for (int32_t i = 0; i < pNode->replica; ++i) { - pPeer = (SSyncPeer *)pNode->peerInfo[i]; + pPeer = pNode->peerInfo[i]; if (pPeer->peerFd >= 0) { syncRestartConnection(pPeer); } diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 393b6b09b1b096c0c942f09e0309ab86dd023086..d3dbe3a32db1baf6669d52a1df5fad785113dfd3 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -140,6 +140,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { if (buffer == NULL) return -1; SWalHead *pHead = (SWalHead *)buffer; + uint64_t lastVer = 0; while (1) { ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); @@ -153,7 +154,14 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); if (ret < 0) break; - sDebug("%s, restore a record, qtype:wal hver:%" PRIu64, pPeer->id, pHead->version); + sDebug("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version); + + if (lastVer != 0 && lastVer == pHead->version) { + sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer); + break; + } + lastVer = pHead->version; + (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL); }