diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index f571cfda9f638a52c943cf17f53928669f243206..34df11adcce72a6a21718b81c1895c30c66bea1c 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -92,33 +92,23 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - taos_queue queue = vnodeAcquireRqueue(pHead->vgId); - - if (queue == NULL) { - leftLen -= pHead->contLen; - pCont -= pHead->contLen; - continue; + void *pVnode = vnodeAcquire(pHead->vgId); + if (pVnode != NULL) { + int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg); + if (code == TSDB_CODE_SUCCESS) queuedMsgNum++; + vnodeRelease(pVnode); } - // put message into queue - SVReadMsg *pRead = taosAllocateQitem(sizeof(SVReadMsg)); - pRead->rpcMsg = *pMsg; - pRead->pCont = pCont; - pRead->contLen = pHead->contLen; - - // next vnode leftLen -= pHead->contLen; pCont -= pHead->contLen; - queuedMsgNum++; - - taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); } if (queuedMsgNum == 0) { SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; rpcSendResponse(&rpcRsp); - rpcFreeCont(pMsg->pCont); } + + rpcFreeCont(pMsg->pCont); } void *dnodeAllocVReadQueue(void *pVnode) { @@ -162,50 +152,48 @@ void dnodeFreeVReadQueue(void *rqueue) { void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) { SRpcMsg rpcRsp = { - .handle = pRead->rpcMsg.handle, + .handle = pRead->rpcHandle, .pCont = pRead->rspRet.rsp, .contLen = pRead->rspRet.len, .code = code, }; rpcSendResponse(&rpcRsp); - rpcFreeCont(pRead->rpcMsg.pCont); vnodeRelease(pVnode); } void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) { - rpcFreeCont(pRead->rpcMsg.pCont); vnodeRelease(pVnode); } static void *dnodeProcessReadQueue(void *param) { - SVReadMsg *pReadMsg; + SVReadMsg *pRead; int32_t qtype; void * pVnode; while (1) { - if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pReadMsg, &pVnode) == 0) { + if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pRead, &pVnode) == 0) { dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset); break; } - dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle, - taosMsg[pReadMsg->rpcMsg.msgType], qtype, pReadMsg); + dDebug("%p, msg:%p:%s will be processed in vread queue, qtype:%d", pRead->rpcAhandle, pRead, + taosMsg[pRead->msgType], qtype); - int32_t code = vnodeProcessRead(pVnode, pReadMsg); + int32_t code = vnodeProcessRead(pVnode, pRead); if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { - dnodeSendRpcVReadRsp(pVnode, pReadMsg, code); + dnodeSendRpcVReadRsp(pVnode, pRead, code); } else { if (code == TSDB_CODE_QRY_HAS_RSP) { - dnodeSendRpcVReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); + dnodeSendRpcVReadRsp(pVnode, pRead, pRead->code); } else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client - assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5)); - dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); + assert(pRead->rpcHandle == NULL || (pRead->rpcHandle != NULL && pRead->msgType == 5)); + dnodeDispatchNonRspMsg(pVnode, pRead, code); } } - taosFreeQitem(pReadMsg); + taosFreeQitem(pRead); } return NULL; diff --git a/src/inc/vnode.h b/src/inc/vnode.h index e77bec926af65c1d6a7061c17384934ca1fddd87..018e96e1938336809fc8829be42ed3e946c6ac98 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -37,10 +37,15 @@ typedef struct { } SRspRet; typedef struct { - SRspRet rspRet; - void * pCont; + int32_t code; int32_t contLen; - SRpcMsg rpcMsg; + void * rpcHandle; + void * rpcAhandle; + void * qhandle; + int8_t qtype; + int8_t msgType; + SRspRet rspRet; + char pCont[]; } SVReadMsg; typedef struct { @@ -62,13 +67,11 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); void* vnodeAcquire(int32_t vgId); // add refcount -void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam); int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam); -int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); void vnodeBuildStatusMsg(void *param); void vnodeConfirmForward(void *param, uint64_t version, int32_t code); @@ -77,8 +80,8 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); int32_t vnodeInitResources(); void vnodeCleanupResources(); -int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pReadMsg); -int32_t vnodeCheckRead(void *pVnode); +int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam); +int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 2dcdba5d7c544de68ec4de9b7bc4e455d88805a0..128da72623bc44f948ad0691e6e1d576f6ae8e13 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -468,21 +468,6 @@ void *vnodeAcquire(int32_t vgId) { return *ppVnode; } -void *vnodeAcquireRqueue(int32_t vgId) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) return NULL; - - int32_t code = vnodeCheckRead(pVnode); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - vInfo("vgId:%d, can not provide read service, status is %s", vgId, vnodeStatus[pVnode->status]); - vnodeRelease(pVnode); - return NULL; - } - - return pVnode->rqueue; -} - void *vnodeGetWal(void *pVnode) { return ((SVnodeObj *)pVnode)->wal; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index faa35b0e02d783c1d0b415a60a77c6e98618041e..fb984f67504fbf89630dc474278a513ab8d7a4f8 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -46,7 +46,7 @@ void vnodeInitReadFp(void) { // int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) { SVnodeObj *pVnode = (SVnodeObj *)param; - int msgType = pReadMsg->rpcMsg.msgType; + int32_t msgType = pReadMsg->msgType; if (vnodeProcessReadMsgFp[msgType] == NULL) { vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); @@ -56,7 +56,7 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } -int32_t vnodeCheckRead(void *param) { +static int32_t vnodeCheckRead(void *param) { SVnodeObj *pVnode = param; if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], @@ -78,24 +78,58 @@ int32_t vnodeCheckRead(void *param) { return TSDB_CODE_SUCCESS; } -static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) { - int32_t code = vnodeCheckRead(pVnode); - if (code != TSDB_CODE_SUCCESS) return code; - SVReadMsg *pRead = (SVReadMsg *)taosAllocateQitem(sizeof(SVReadMsg)); - pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - pRead->pCont = qhandle; - pRead->contLen = 0; - pRead->rpcMsg.ahandle = ahandle; +int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) { + SVnodeObj *pVnode = vparam; - atomic_add_fetch_32(&pVnode->refCount, 1); + if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_QUERY) { + int32_t code = vnodeCheckRead(pVnode); + if (code != TSDB_CODE_SUCCESS) return code; + } + + int32_t size = sizeof(SVReadMsg) + contLen; + SVReadMsg *pRead = taosAllocateQitem(size); + if (pRead == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + if (rparam != NULL) { + SRpcMsg *pRpcMsg = rparam; + pRead->rpcHandle = pRpcMsg->handle; + pRead->rpcAhandle = pRpcMsg->ahandle; + pRead->msgType = pRpcMsg->msgType; + pRead->code = pRpcMsg->code; + } + + if (contLen != 0) { + pRead->contLen = contLen; + memcpy(pRead->pCont, pCont, contLen); + } else { + pRead->qhandle = pCont; + } + + pRead->qtype = qtype; - vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead); - taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); + atomic_add_fetch_32(&pVnode->refCount, 1); + vTrace("vgId:%d, get vnode rqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); + taosWriteQitem(pVnode->rqueue, qtype, pRead); return TSDB_CODE_SUCCESS; } +static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) { + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + rpcMsg.ahandle = ahandle; + + int32_t code = vnodeWriteToRQueue(pVnode, qhandle, 0, TAOS_QTYPE_QUERY, &rpcMsg); + if (code == TSDB_CODE_SUCCESS) { + vDebug("QInfo:%p add to vread queue for exec query", *qhandle); + } + + return code; +} + /** * * @param pRet response message object @@ -155,18 +189,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { memset(pRet, 0, sizeof(SRspRet)); // qHandle needs to be freed correctly - if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (pReadMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pReadMsg->pCont; killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); - vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); - assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); + vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcHandle); + assert(pReadMsg->contLen > 0 && killQueryMsg->free == 1); void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle); if (qhandle == NULL || *qhandle == NULL) { vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle, - pReadMsg->rpcMsg.handle); + pReadMsg->rpcHandle); } else { assert(*qhandle == (void *)killQueryMsg->qhandle); @@ -208,9 +242,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { } if (handle != NULL && - vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, - pReadMsg->rpcMsg.handle); + pReadMsg->rpcHandle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); return pRsp->code; @@ -221,7 +255,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); - code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle); + code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcHandle); if (code != TSDB_CODE_SUCCESS) { pRsp->code = code; qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -230,7 +264,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { } } else { assert(pCont != NULL); - void **qhandle = (void **)pCont; + void **qhandle = (void **)pReadMsg->qhandle; vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); @@ -242,14 +276,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { // build query rsp, the retrieve request has reached here already if (buildRes) { // update the connection info according to the retrieve connection - pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*qhandle); - assert(pReadMsg->rpcMsg.handle != NULL); + pReadMsg->rpcHandle = qGetResultRetrieveMsg(*qhandle); + assert(pReadMsg->rpcHandle != NULL); vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, - pReadMsg->rpcMsg.handle); + pReadMsg->rpcHandle); // set the real rsp error code - pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcMsg.ahandle); + pReadMsg->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcHandle); // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client code = TSDB_CODE_QRY_HAS_RSP; @@ -283,7 +317,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { pRetrieve->qhandle = htobe64(pRetrieve->qhandle); vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle, - pRetrieve->free, pReadMsg->rpcMsg.handle); + pRetrieve->free, pReadMsg->rpcHandle); memset(pRet, 0, sizeof(SRspRet)); @@ -314,9 +348,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { } // register the qhandle to connect to quit query immediate if connection is broken - if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, - pReadMsg->rpcMsg.handle); + if (vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcHandle); code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -326,7 +359,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { bool freeHandle = true; bool buildRes = false; - code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle); + code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcHandle); if (code != TSDB_CODE_SUCCESS) { // TODO handle malloc failure pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); @@ -337,7 +370,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { assert(buildRes == true); #if _NON_BLOCKING_RETRIEVE if (!buildRes) { - assert(pReadMsg->rpcMsg.handle != NULL); + assert(pReadMsg->rpcHandle != NULL); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); return TSDB_CODE_QRY_NOT_READY; @@ -345,7 +378,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { #endif // ahandle is the sqlObj pointer - code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle); + code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcHandle); } // If qhandle is not added into vread queue, the query should be completed already or paused with error. diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 80c68b09178372d30381e0e471bb2ed4fba7349c..3caee2fb0c6469f8325906c06ba5b598ebc161b9 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -97,7 +97,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara return syncCode; } -int32_t vnodeCheckWrite(void *param) { +static int32_t vnodeCheckWrite(void *param) { SVnodeObj *pVnode = param; if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);