未验证 提交 5c9e8159 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #11623 from taosdata/feature/vnode_refact1

refactor: vnode
...@@ -97,6 +97,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -97,6 +97,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
int64_t version;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) { if (pArray == NULL) {
...@@ -115,23 +116,32 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -115,23 +116,32 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
} }
} }
vnodePreprocessWriteReqs(pVnode->pImpl, pArray); vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version);
numOfMsgs = taosArrayGetSize(pArray); numOfMsgs = taosArrayGetSize(pArray);
for (int32_t i = 0; i < numOfMsgs; i++) { for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
SRpcMsg *pRsp = NULL; SRpcMsg rsp;
rsp.pCont = NULL;
rsp.contLen = 0;
rsp.code = 0;
rsp.handle = pRpc->handle;
rsp.ahandle = pRpc->ahandle;
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
tmsgSendRsp(&rsp);
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, &pRsp); #if 0
if (pRsp != NULL) { if (pRsp != NULL) {
pRsp->ahandle = pRpc->ahandle; pRsp->ahandle = pRpc->ahandle;
tmsgSendRsp(pRsp);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
} else { } else {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
vmSendRsp(pVnode->pWrapper, pMsg, code); vmSendRsp(pVnode->pWrapper, pMsg, code);
} }
#endif
} }
for (int32_t i = 0; i < numOfMsgs; i++) { for (int32_t i = 0; i < numOfMsgs; i++) {
...@@ -153,7 +163,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -153,7 +163,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
// todo // todo
SRpcMsg *pRsp = NULL; SRpcMsg *pRsp = NULL;
(void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); // (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
} }
} }
......
...@@ -48,8 +48,8 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); ...@@ -48,8 +48,8 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs); int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
......
...@@ -16,34 +16,32 @@ ...@@ -16,34 +16,32 @@
#include "vnodeInt.h" #include "vnodeInt.h"
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
SNodeMsg *pMsg; SNodeMsg *pMsg;
SRpcMsg *pRpc; SRpcMsg *pRpc;
*version = pVnode->state.processed;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i); pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
pRpc = &pMsg->rpcMsg; pRpc = &pMsg->rpcMsg;
// set request version // set request version
void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead)); if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
int64_t ver = pVnode->state.processed++;
taosEncodeFixedI64(&pBuf, ver);
if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
// TODO: handle error
/*ASSERT(false);*/
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr()); vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
return -1;
} }
} }
walFsync(pVnode->pWal, false); walFsync(pVnode->pWal, false);
return 0;
} }
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL; void *ptr = NULL;
int ret; int ret;
...@@ -58,9 +56,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -58,9 +56,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
// todo: change the interface here // todo: change the interface here
int64_t ver; if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
// TODO: handle error // TODO: handle error
} }
...@@ -69,6 +65,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -69,6 +65,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
return 0; return 0;
case TDMT_VND_CREATE_TABLE: case TDMT_VND_CREATE_TABLE:
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp); return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp);
case TDMT_VND_ALTER_STB: case TDMT_VND_ALTER_STB:
return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
...@@ -78,14 +75,8 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -78,14 +75,8 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
break; break;
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
/*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/ pRsp->msgType = TDMT_VND_SUBMIT_RSP;
if (pVnode->config.streamMode == 0) { return vnodeProcessSubmitReq(pVnode, ptr, pRsp);
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
(*pRsp)->handle = pMsg->handle;
(*pRsp)->ahandle = pMsg->ahandle;
return vnodeProcessSubmitReq(pVnode, ptr, *pRsp);
}
break;
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO: handle error // TODO: handle error
...@@ -128,7 +119,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -128,7 +119,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
break; break;
} }
pVnode->state.applied = ver; pVnode->state.applied = version;
// Check if it needs to commit // Check if it needs to commit
if (vnodeShouldCommit(pVnode)) { if (vnodeShouldCommit(pVnode)) {
...@@ -222,7 +213,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) { ...@@ -222,7 +213,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
return 0; return 0;
} }
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp) { static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) {
SVCreateTbBatchReq vCreateTbBatchReq = {0}; SVCreateTbBatchReq vCreateTbBatchReq = {0};
SVCreateTbBatchRsp vCreateTbBatchRsp = {0}; SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq); tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
...@@ -274,12 +265,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR ...@@ -274,12 +265,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp); tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
taosArrayDestroy(vCreateTbBatchRsp.rspList); taosArrayDestroy(vCreateTbBatchRsp.rspList);
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); pRsp->pCont = msg;
(*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->contLen = contLen;
(*pRsp)->pCont = msg;
(*pRsp)->contLen = contLen;
(*pRsp)->handle = pMsg->handle;
(*pRsp)->ahandle = pMsg->ahandle;
} }
return 0; return 0;
...@@ -312,7 +299,6 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg ...@@ -312,7 +299,6 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg
} }
// encode the response (TODO) // encode the response (TODO)
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
memcpy(pRsp->pCont, &rsp, sizeof(rsp)); memcpy(pRsp->pCont, &rsp, sizeof(rsp));
pRsp->contLen = sizeof(SSubmitRsp); pRsp->contLen = sizeof(SSubmitRsp);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册