From a13f9acfba8ef7110a81ad9c4d54689ad8dbc323 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 13 Oct 2020 10:27:05 +0000 Subject: [PATCH] TD-1652 --- src/dnode/src/dnodeVWrite.c | 3 +++ src/inc/taoserror.h | 1 + src/sync/src/syncMain.c | 11 ++++++----- src/vnode/src/vnodeRead.c | 5 +++-- src/vnode/src/vnodeWrite.c | 15 +++++++++++---- 5 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 3f2c9df222..d07afb17e7 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -244,6 +244,9 @@ static void *dnodeProcessWriteQueue(void *param) { } int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet); + dTrace("%p, wal msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, + taosMsg[pHead->msgType], pHead->version, tstrerror(code)); + if (pWrite) { pWrite->rpcMsg.code = code; if (code <= 0) pWrite->processedCount = 1; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 786342b5a6..02ab2dd166 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -247,6 +247,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x080B, "CPU cores // sync TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid sync version") // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 9a73348b93..f527a996b0 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -341,18 +341,19 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { pPeer = pNode->peerInfo[i]; syncRestartConnection(pPeer); } + return TSDB_CODE_SYN_INVALID_VERSION; } else { // always update version nodeVersion = pWalHead->version; - sDebug("vgId:%d, update version, replica:%d nodeRole:%d qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, - nodeRole, qtype, pWalHead->version); + sDebug("vgId:%d, update version, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, + syncRole[nodeRole], qtype, pWalHead->version); + return TSDB_CODE_SUCCESS; } - return 0; } nodeVersion = pWalHead->version; - sDebug("vgId:%d, forward will send, replica:%d nodeRole:%d qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, - nodeRole, qtype, pWalHead->version); + sDebug("vgId:%d, forward will send, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, + syncRole[nodeRole], qtype, pWalHead->version); // only pkt from RPC or CQ can be forwarded if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 58e97075ac..9da9d65009 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -59,8 +59,9 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { // TODO: Later, let slave to support query // if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { - if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { - vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); + if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { + vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType], + pVnode->syncCfg.replica, syncRole[pVnode->role]); return TSDB_CODE_APP_NOT_READY; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 0c310439bb..0963300047 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -71,7 +71,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { } if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { - vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); + vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType], + pVnode->syncCfg.replica, syncRole[pVnode->role]); return TSDB_CODE_APP_NOT_READY; } @@ -85,18 +86,24 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { if (pHead->version <= pVnode->version) return 0; } + int64_t lastVersion = pVnode->version; pVnode->version = pHead->version; // write into WAL code = walWrite(pVnode->wal, pHead); if (code < 0) return code; - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync + // forward to peers, even it is WAL/FWD, it shall be called to update version in sync int32_t syncCode = 0; syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); - if (syncCode < 0) return syncCode; + if (syncCode < 0) { + pVnode->version = lastVersion; + vError("vgId:%d, msgType:%s not processed, reason:%s, wal ver:%" PRIu64 " restore to last ver:%" PRIu64, + pVnode->vgId, taosMsg[pHead->msgType], tstrerror(syncCode), pHead->version, pVnode->version); + return syncCode; + } - // write data locally + // write data locally code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); if (code < 0) return code; -- GitLab