diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 87b31e4604882d750e4a43fde4a6d818cc862e98..84fd260d91420ea19a796f10fa96d1fbbb1ac648 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -205,7 +205,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead.msgType], qtypeStr[qtype], pWrite->pHead.version); pWrite->code = vnodeProcessWrite(pVnode, &pWrite->pHead, qtype, pWrite); - if (pWrite->code <= 0) pWrite->processedCount = 1; + if (pWrite->code <= 0) atomic_add_fetch_32(&pWrite->processedCount, 1); if (pWrite->code > 0) pWrite->code = 0; if (pWrite->code == 0 && pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index e318978a11186a86dff79a5aaea234b9719ce979..92e1ba804be5ab30366594933ebcdaff3bd0727b 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -96,6 +96,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara // write into WAL code = walWrite(pVnode->wal, pHead); if (code < 0) { + if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1); vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code); return code; } @@ -104,7 +105,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara // write data locally code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet); - if (code < 0) return code; + if (code < 0) { + if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1); + return code; + } return syncCode; }