diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 6997d0a666555e1237b7f5e019368c822ed2a6b1..6573de2987ef0c134be9a29563fda602f01f6e00 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -1051,7 +1051,10 @@ static int32_t sdbWriteFwdToQueue(int32_t vgId, void *wparam, int32_t qtype, voi memcpy(pRow->pHead, pHead, sizeof(SWalHead) + pHead->len); pRow->rowData = pRow->pHead->cont; - return sdbWriteToQueue(pRow, qtype); + int32_t code = sdbWriteToQueue(pRow, qtype); + if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) code = 0; + + return code; } static int32_t sdbWriteRowToQueue(SSdbRow *pInputRow, int32_t action) { diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index a5e268cdd262ee1cd4bae6433de9c7c764e6561a..606760c928cdaea3c3fae3b80573b29404bfafbd 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -195,7 +195,11 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer, uint64_t *wver) { } lastVer = pHead->version; - (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL); + ret = (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL); + if (ret != 0) { + sError("%s, failed to restore record since %s, hver:%" PRIu64, pPeer->id, tstrerror(ret), pHead->version); + break; + } } if (code < 0) { diff --git a/src/vnode/inc/vnodeStatus.h b/src/vnode/inc/vnodeStatus.h index 00ac47df65fba91b9d7ef4b92ab5210de3652330..910a6d71b201fcdc9fbc1daa99fb57d0227d6093 100644 --- a/src/vnode/inc/vnodeStatus.h +++ b/src/vnode/inc/vnodeStatus.h @@ -37,6 +37,7 @@ bool vnodeSetResetStatus(SVnodeObj* pVnode); bool vnodeInInitStatus(SVnodeObj* pVnode); bool vnodeInReadyStatus(SVnodeObj* pVnode); +bool vnodeInReadyOrUpdatingStatus(SVnodeObj* pVnode); bool vnodeInClosingStatus(SVnodeObj* pVnode); bool vnodeInResetStatus(SVnodeObj* pVnode); diff --git a/src/vnode/src/vnodeStatus.c b/src/vnode/src/vnodeStatus.c index 76618ffca792f02a0e47addb9dd9494348cc04f5..0bff062f09710e36c8bc94b7808d38d0e54b56c7 100644 --- a/src/vnode/src/vnodeStatus.c +++ b/src/vnode/src/vnodeStatus.c @@ -135,6 +135,18 @@ bool vnodeInReadyStatus(SVnodeObj* pVnode) { return in; } +bool vnodeInReadyOrUpdatingStatus(SVnodeObj* pVnode) { + bool in = false; + pthread_mutex_lock(&pVnode->statusMutex); + + if (pVnode->status == TAOS_VN_STATUS_READY || pVnode->status == TAOS_VN_STATUS_UPDATING) { + in = true; + } + + pthread_mutex_unlock(&pVnode->statusMutex); + return in; +} + bool vnodeInClosingStatus(SVnodeObj* pVnode) { bool in = false; pthread_mutex_lock(&pVnode->statusMutex); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index b65251508d0667459d1e148c3d2c58a79c0fb215..2dc814b8f39058cb25a9bed2decc838fbf848e88 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -242,17 +242,18 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { if (pWrite->qtype == TAOS_QTYPE_RPC) { int32_t code = vnodeCheckWrite(pVnode); if (code != TSDB_CODE_SUCCESS) { + vError("vgId:%d, failed to write into vwqueue since %s", pVnode->vgId, tstrerror(code)); taosFreeQitem(pWrite); vnodeRelease(pVnode); return code; } } - if (!vnodeInReadyStatus(pVnode)) { - vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], - pVnode->refCount, pVnode); + if (!vnodeInReadyOrUpdatingStatus(pVnode)) { + vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId, + vnodeStatus[pVnode->status], pVnode->refCount, pVnode); taosFreeQitem(pWrite); - vnodeRelease(pVnode); + vnodeRelease(pVnode); return TSDB_CODE_APP_NOT_READY; }