diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index aaf1098d40ad04a1e3e31ca3e0d23b249f6f9b6a..9b58d7bb6187d5dcafc40fb7287b6a4f6dd71512 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -211,7 +211,7 @@ static void *dnodeProcessWriteQueue(void *param) { dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite, taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version); - pWrite->code = vnodeProcessWrite(pVnode, qtype, pWrite); + pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); if (pWrite->code <= 0) pWrite->processedCount = 1; dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code)); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 8b3b4b6ed027b38dc44987e47a675e7d2937456f..09273d3907bfcdf80b06788e891a2a70112dc86a 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -66,8 +66,8 @@ void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); -int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg); -int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite); +int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam); +int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam); int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); void vnodeBuildStatusMsg(void *param); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 49b7e4e7124e6408393be40de29b43a2488b9822..a13addb27cf79c1a083114fe47cfc726e6bfbb93 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -305,7 +305,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { return terrno; } - walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); + walRestore(pVnode->wal, pVnode, vnodeProcessWrite); if (pVnode->version == 0) { pVnode->version = walGetVersion(pVnode->wal); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 0d521b4d2ecf03a1ee433ee4086cf1f08f902f31..417448423ae82ec9633df755c278d60c0ea8696d 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -46,10 +46,11 @@ void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; } -int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { - int32_t code = 0; - SVnodeObj *pVnode = param; - SWalHead * pHead = pWrite->pHead; +int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) { + int32_t code = 0; + SVnodeObj * pVnode = vparam; + SWalHead * pHead = wparam; + SRspRet * pRspRet = rparam; if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]); @@ -80,7 +81,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { // forward to peers, even it is WAL/FWD, it shall be called to update version in sync int32_t syncCode = 0; - syncCode = syncForwardToPeer(pVnode->sync, pHead, &pWrite->rspRet, qtype); + syncCode = syncForwardToPeer(pVnode->sync, pHead, pRspRet, qtype); if (syncCode < 0) return syncCode; // write into WAL @@ -90,7 +91,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { pVnode->version = pHead->version; // write data locally - code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, &pWrite->rspRet); + code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet); if (code < 0) return code; return syncCode; @@ -204,7 +205,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR return TSDB_CODE_SUCCESS; } -int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) { +int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { SVnodeObj *pVnode = vparam; SWalHead * pHead = wparam; @@ -219,8 +220,8 @@ int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) return TSDB_CODE_VND_OUT_OF_MEMORY; } - if (pMsg != NULL) { - SRpcMsg *pRpcMsg = pMsg; + if (rparam != NULL) { + SRpcMsg *pRpcMsg = rparam; pWrite->rpcHandle = pRpcMsg->handle; pWrite->rpcAhandle = pRpcMsg->ahandle; } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index d85f740597ad2a5c38bbe8564c6835136d0c3d66..470607e965141043ec44d9c26d243cc814c164d3 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -305,8 +305,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch wTrace("vgId:%d, fileId:%" PRId64 ", restore wal ver:%" PRIu64 ", head ver:%" PRIu64 " len:%d", pWal->vgId, fileId, pWal->version, pHead->version, pHead->len); - if (pWal->keep) pWal->version = pHead->version; - + pWal->version = pHead->version; (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); }