diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 589ff470f170f2ff94efe1974b1c755ea8e16bb1..4247468bcae77dab49f61dd684a80ada9701418c 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -36,6 +36,8 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex if (sindex < 0 || eindex < sindex) return; + sDebug("%s, extra files will be removed between sindex:%d and eindex:%d", pPeer->id, sindex, eindex); + while (1) { name[0] = 0; magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion); @@ -61,11 +63,12 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { bool fileChanged = false; *fversion = 0; - sinfo.index = 0; + sinfo.index = -1; while (1) { // read file info - int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(SFileInfo)); - if (ret != sizeof(SFileInfo)) { + minfo.index = -1; + int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo)); + if (ret != sizeof(SFileInfo) || minfo.index == -1) { sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); break; } @@ -75,7 +78,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { sDebug("%s, no more files to restore", pPeer->id); // remove extra files after the current index - syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX); + if (sinfo.index != -1) syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX); code = 0; break; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 7cf1a9059839e2111808b9458b35fb1ad0573f57..80e2dc422e07395f19fa2cad79b37037e3d06f86 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -243,8 +243,10 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); if (queued > MAX_QUEUED_MSG_NUM) { - vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued); - taosMsleep(3); + int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3; + if (ms > 100) ms = 100; + vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms); + taosMsleep(ms); } code = vnodePerformFlowCtrl(pWrite);