提交 eddd5b39 编写于 作者: S Shengliang Guan

refactor: adjust vm worker

上级 7ff24189
...@@ -22,21 +22,19 @@ ...@@ -22,21 +22,19 @@
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = { SRpcMsg rsp = {
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info,
}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeMgmt *pMgmt = pInfo->ahandle; SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
int32_t code = -1; switch (pMsg->msgType) {
tmsg_t msgType = pMsg->msgType;
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(msgType));
switch (msgType) {
case TDMT_MON_VM_INFO: case TDMT_MON_VM_INFO:
code = vmProcessGetMonitorInfoReq(pMgmt, pMsg); code = vmProcessGetMonitorInfoReq(pMgmt, pMsg);
break; break;
...@@ -54,7 +52,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -54,7 +52,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dError("msg:%p, not processed in vnode queue", pMsg); dError("msg:%p, not processed in vnode queue", pMsg);
} }
if (msgType & 1u) { if (IsReq(pMsg)) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
...@@ -96,7 +94,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -96,7 +94,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
if (pArray == NULL) { if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
...@@ -116,7 +113,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -116,7 +113,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
for (int i = 0; i < taosArrayGetSize(pArray); i++) { for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
SRpcMsg rsp = {.info = pMsg->info, .pCont = NULL, .contLen = 0}; SRpcMsg rsp = {.info = pMsg->info};
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false); int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
...@@ -130,7 +127,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -130,7 +127,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR; rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) { } else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) {
// ok
// send response in applyQ // send response in applyQ
} else { } else {
assert(0); assert(0);
...@@ -149,16 +145,13 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -149,16 +145,13 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
SRpcMsg rsp;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
// init response rpc msg // init response rpc msg
rsp.code = 0; SRpcMsg rsp = {0};
rsp.pCont = NULL;
rsp.contLen = 0;
// get original rpc msg // get original rpc msg
assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG); assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG);
...@@ -177,7 +170,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -177,7 +170,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rpcFreeCont(originalRpcMsg.pCont); rpcFreeCont(originalRpcMsg.pCont);
// if leader, send response // if leader, send response
// if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
if (pMsg->info.handle != NULL) { if (pMsg->info.handle != NULL) {
rsp.info = pMsg->info; rsp.info = pMsg->info;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
...@@ -190,21 +182,19 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -190,21 +182,19 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
// todo int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL);
SRpcMsg *pRsp = NULL; if (code != 0) {
int32_t ret = vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
if (ret != 0) {
// if leader, send response
if (pMsg->info.handle != NULL) { if (pMsg->info.handle != NULL) {
SRpcMsg rsp = {0}; SRpcMsg rsp = {
rsp.code = terrno; .code = (terrno < 0) ? terrno : code,
rsp.info = pMsg->info; .info = pMsg->info,
dTrace("msg:%p, process sync queue error since code:%s", pMsg, terrstr()); };
dTrace("msg:%p, failed to process sync queue since %s", pMsg, terrstr());
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
} }
...@@ -216,9 +206,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf ...@@ -216,9 +206,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, get from vnode-merge queue", pMsg); dTrace("msg:%p, get from vnode-merge queue", pMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册