提交 6874de8e 编写于 作者: S Shengliang Guan

TD-1842

上级 50cdd465
...@@ -211,7 +211,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -211,7 +211,7 @@ static void *dnodeProcessWriteQueue(void *param) {
dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite, dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite,
taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version); taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version);
pWrite->code = vnodeProcessWrite(pVnode, qtype, pWrite); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
if (pWrite->code <= 0) pWrite->processedCount = 1; if (pWrite->code <= 0) pWrite->processedCount = 1;
dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code)); dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
......
...@@ -66,8 +66,8 @@ void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue ...@@ -66,8 +66,8 @@ void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void vnodeRelease(void *pVnode); // dec refCount void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg); int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite); int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param); void vnodeBuildStatusMsg(void *param);
......
...@@ -305,7 +305,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -305,7 +305,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return terrno; return terrno;
} }
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); walRestore(pVnode->wal, pVnode, vnodeProcessWrite);
if (pVnode->version == 0) { if (pVnode->version == 0) {
pVnode->version = walGetVersion(pVnode->wal); pVnode->version = walGetVersion(pVnode->wal);
} }
......
...@@ -46,10 +46,11 @@ void vnodeInitWriteFp(void) { ...@@ -46,10 +46,11 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
} }
int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
int32_t code = 0; int32_t code = 0;
SVnodeObj *pVnode = param; SVnodeObj * pVnode = vparam;
SWalHead * pHead = pWrite->pHead; SWalHead * pHead = wparam;
SRspRet * pRspRet = rparam;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]); vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
...@@ -80,7 +81,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { ...@@ -80,7 +81,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) {
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, &pWrite->rspRet, qtype); syncCode = syncForwardToPeer(pVnode->sync, pHead, pRspRet, qtype);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write into WAL // write into WAL
...@@ -90,7 +91,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { ...@@ -90,7 +91,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) {
pVnode->version = pHead->version; pVnode->version = pHead->version;
// write data locally // write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, &pWrite->rspRet); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
if (code < 0) return code; if (code < 0) return code;
return syncCode; return syncCode;
...@@ -204,7 +205,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR ...@@ -204,7 +205,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) { int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam; SWalHead * pHead = wparam;
...@@ -219,8 +220,8 @@ int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) ...@@ -219,8 +220,8 @@ int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg)
return TSDB_CODE_VND_OUT_OF_MEMORY; return TSDB_CODE_VND_OUT_OF_MEMORY;
} }
if (pMsg != NULL) { if (rparam != NULL) {
SRpcMsg *pRpcMsg = pMsg; SRpcMsg *pRpcMsg = rparam;
pWrite->rpcHandle = pRpcMsg->handle; pWrite->rpcHandle = pRpcMsg->handle;
pWrite->rpcAhandle = pRpcMsg->ahandle; pWrite->rpcAhandle = pRpcMsg->ahandle;
} }
......
...@@ -305,8 +305,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -305,8 +305,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
wTrace("vgId:%d, fileId:%" PRId64 ", restore wal ver:%" PRIu64 ", head ver:%" PRIu64 " len:%d", pWal->vgId, fileId, wTrace("vgId:%d, fileId:%" PRId64 ", restore wal ver:%" PRIu64 ", head ver:%" PRIu64 " len:%d", pWal->vgId, fileId,
pWal->version, pHead->version, pHead->len); pWal->version, pHead->version, pHead->len);
if (pWal->keep) pWal->version = pHead->version; pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册