diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 7f1a5b2580b8d34ceb00398c19e7f6660faea606..2f9e9a0af9d6e573892a8e8cd01e3075b2a2bf7b 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -192,13 +192,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) { dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead); + code = TSDB_CODE_SUCCESS; } SRpcMsg rpcRsp = { .handle = pRead->rpcMsg.handle, .pCont = pRead->rspRet.rsp, .contLen = pRead->rspRet.len, - .code = pRead->rspRet.code, + .code = code, }; rpcSendResponse(&rpcRsp); @@ -216,7 +217,7 @@ static void *dnodeProcessReadQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); + dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); dnodeSendRpcReadRsp(pVnode, pReadMsg, code); taosFreeQitem(pReadMsg); diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 53533b818388b7ff44674dbd8c841b0c347b3a55..e61364355d4a23dca823ed27a0876ee179a6fe19 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -216,7 +216,7 @@ static void *dnodeProcessWriteQueue(void *param) { pHead->msgType = pWrite->rpcMsg.msgType; pHead->version = 0; pHead->len = pWrite->contLen; - dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); + dTrace("%p, msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); } else { pHead = (SWalHead *)item; } diff --git a/src/inc/vnode.h b/src/inc/vnode.h index f4fb8060feabc12e0f6bfcb933572ed4090ebb6e..4d482afa02af892d2c8ebddb995bf7950a33a4ea 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -30,7 +30,6 @@ typedef enum _VN_STATUS { typedef struct { int len; - int code; void *rsp; void *qhandle; //used by query and retrieve msg } SRspRet; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 2cf72bb15d68bbceb45eb4816b4b3f29099e8acb..f198c2ffe417ae918b14e291b7390466c62b11a5 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -39,15 +39,21 @@ void vnodeInitReadFp(void) { int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { SVnodeObj *pVnode = (SVnodeObj *)param; - if (vnodeProcessReadMsgFp[msgType] == NULL) + if (vnodeProcessReadMsgFp[msgType] == NULL) { + vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); return TSDB_CODE_VND_MSG_NOT_PROCESSED; + } - if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) + if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) { + vTrace("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); return TSDB_CODE_VND_INVALID_VGROUP_ID; + } // TODO: Later, let slave to support query - if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) + if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { + vTrace("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); return TSDB_CODE_RPC_NOT_READY; + } return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); } @@ -60,11 +66,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont qinfo_t pQInfo = NULL; if (contLen != 0) { - pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); + code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - pRsp->code = pRet->code; + pRsp->code = code; pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; @@ -74,9 +80,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont assert(pCont != NULL); pQInfo = pCont; code = TSDB_CODE_VND_ACTION_IN_PROGRESS; + vTrace("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo); } if (pQInfo != NULL) { + vTrace("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo); qTableQuery(pQInfo); // do execute query } @@ -88,18 +96,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont void *pQInfo = (void*) htobe64(pRetrieve->qhandle); memset(pRet, 0, sizeof(SRspRet)); - int32_t code = TSDB_CODE_SUCCESS; - vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); - pRet->code = qRetrieveQueryResultInfo(pQInfo); - if (pRet->code != TSDB_CODE_SUCCESS) { + int32_t code = qRetrieveQueryResultInfo(pQInfo); + if (code != TSDB_CODE_SUCCESS) { //TODO pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); } else { // todo check code and handle error in build result set - pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); + code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); if (qHasMoreResultsToRetrieve(pQInfo)) { pRet->qhandle = pQInfo;