From 77e03bfd782eb5609e596ea69f90b44967cdc351 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 23 Apr 2023 20:14:49 +0800 Subject: [PATCH] opti:change push mgr to consume msg for subscribe --- source/dnode/mnode/impl/src/mndConsumer.c | 2 - source/dnode/vnode/src/inc/tq.h | 2 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 62 ++++++++++---- source/dnode/vnode/src/tq/tqPush.c | 100 +++++++++++----------- source/dnode/vnode/src/tq/tqUtil.c | 27 +++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 10 +-- 7 files changed, 123 insertions(+), 81 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 65a2fa72a2..ca71e17d7e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -449,7 +449,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); -#if 1 if (status == MQ_CONSUMER_STATUS__LOST_REBD) { mInfo("try to recover consumer:0x%" PRIx64, consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); @@ -463,7 +462,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } -#endif if (status != MQ_CONSUMER_STATUS__READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index acc0d29382..e1b1092c28 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -100,6 +100,7 @@ typedef struct { SWalRef* pRef; STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec + SRpcMsg* msg; } STqHandle; typedef struct { @@ -114,6 +115,7 @@ struct STQ { int64_t walLogLastVer; SRWLatch lock; SHashObj* pPushMgr; // consumerId -> STqPushEntry + SArray * pPushArray; SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 416bc6cdc7..b24cb7e136 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -214,6 +214,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit); +int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a78239a4b5..ae4a7e1d61 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -71,6 +71,11 @@ static void destroyTqHandle(void* data) { walCloseReader(pData->pWalReader); tqCloseReader(pData->execHandle.pTqReader); } + if(pData->msg != NULL) { + rpcFreeCont(pData->msg->pCont); + taosMemoryFree(pData->msg); + pData->msg = NULL; + } } static void tqPushEntryFree(void* data) { @@ -104,6 +109,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); + pTq->pPushArray = taosArrayInit(8, POINTER_BYTES); + taosInitRWLatch(&pTq->lock); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); @@ -152,6 +159,7 @@ void tqClose(STQ* pTq) { tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); taosMemoryFree(pTq); + taosArrayDestroy(pTq->pPushArray); } void tqNotifyClose(STQ* pTq) { @@ -350,25 +358,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // 2. check re-balance status -// taosRLockLatch(&pTq->lock); + taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; -// taosRUnLockLatch(&pTq->lock); + taosRUnLockLatch(&pTq->lock); return -1; } -// taosRUnLockLatch(&pTq->lock); - - // 3. update the epoch value -// taosWLockLatch(&pTq->lock); - int32_t savedEpoch = pHandle->epoch; - if (savedEpoch < reqEpoch) { - tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, - reqEpoch); - pHandle->epoch = reqEpoch; - } -// taosWUnLockLatch(&pTq->lock); + taosRUnLockLatch(&pTq->lock); char buf[80]; tFormatOffset(buf, 80, &reqOffset); @@ -560,8 +558,20 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_store_32(&pHandle->epoch, -1); // remove if it has been register in the push manager, and return one empty block to consumer - //tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); - +// tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); + for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++) { + void* handle = taosArrayGetP(pTq->pPushArray, i); + if(handle == pHandle) { + tqInfo("vgId:%d remove handle when switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); + taosArrayRemove(pTq->pPushArray, i); + break; + } + } + if(pHandle->msg != NULL) { + rpcFreeCont(pHandle->msg->pCont); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); @@ -1067,6 +1077,28 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { return 0; } +int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + tqDebug("vgId:%d start set submit for subscribe", vgId); + + taosWLockLatch(&pTq->lock); + for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){ + STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i); + if(pHandle->msg == NULL){ + tqError("pHandle->msg should not be null"); + } + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen}; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + taosArrayClear(pTq->pPushArray); + // unlock + taosWUnLockLatch(&pTq->lock); + + return 0; +} + int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { #if 0 void* pIter = NULL; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 7a1a6b7454..d2d17792d3 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -268,59 +268,61 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6 } } + int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); - int32_t len = msgLen - sizeof(SSubmitReq2Msg); - int32_t vgId = TD_VID(pTq->pVnode); +// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); +// int32_t len = msgLen - sizeof(SSubmitReq2Msg); +// int32_t vgId = TD_VID(pTq->pVnode); if (msgType == TDMT_VND_SUBMIT) { + tqProcessSubmitReqForSubscribe(pTq); // lock push mgr to avoid potential msg lost - taosWLockLatch(&pTq->lock); - - int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); - if (numOfRegisteredPush > 0) { - tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", - vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); - - void* data = taosMemoryMalloc(len); - if (data == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); - taosWUnLockLatch(&pTq->lock); - return -1; - } - - memcpy(data, pReq, len); - - SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); - void* pIter = NULL; - - while (1) { - pIter = taosHashIterate(pTq->pPushMgr, pIter); - if (pIter == NULL) { - break; - } - - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - - STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); - if (pHandle == NULL) { - tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, - pPushEntry->subKey); - continue; - } - - STqExecHandle* pExec = &pHandle->execHandle; - doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); - } - - doRemovePushedEntry(cachedKey, pTq); - taosArrayDestroyEx(cachedKey, freeItem); - taosMemoryFree(data); - } - - // unlock - taosWUnLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); +// +// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); +// if (numOfRegisteredPush > 0) { +// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", +// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); +// +// void* data = taosMemoryMalloc(len); +// if (data == NULL) { +// terrno = TSDB_CODE_OUT_OF_MEMORY; +// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); +// taosWUnLockLatch(&pTq->lock); +// return -1; +// } +// +// memcpy(data, pReq, len); +// +// SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); +// void* pIter = NULL; +// +// while (1) { +// pIter = taosHashIterate(pTq->pPushMgr, pIter); +// if (pIter == NULL) { +// break; +// } +// +// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; +// +// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); +// if (pHandle == NULL) { +// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, +// pPushEntry->subKey); +// continue; +// } +// +// STqExecHandle* pExec = &pHandle->execHandle; +// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); +// } +// +// doRemovePushedEntry(cachedKey, pTq); +// taosArrayDestroyEx(cachedKey, freeItem); +// taosMemoryFree(data); +// } +// +// // unlock +// taosWUnLockLatch(&pTq->lock); } tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 128ddedf6d..f76e641f2b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -169,22 +169,29 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); - // lock -// taosWLockLatch(&pTq->lock); - qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { goto end; } - // till now, all data has been transferred to consumer, new data needs to push client once arrived. -// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && -// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { -// //code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); -// taosWUnLockLatch(&pTq->lock); -// return code; -// } +// till now, all data has been transferred to consumer, new data needs to push client once arrived. + if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && + dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { +// code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + // lock + taosWLockLatch(&pTq->lock); + if(pHandle->msg != NULL){ + tqError("pHandle->msg should be null"); + } + pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); + memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); + taosArrayPush(pTq->pPushArray, &pHandle); + taosWUnLockLatch(&pTq->lock); + return code; + } code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 579ef8a952..b29081170d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -447,11 +447,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp walApplyVer(pVnode->pWal, version); - //if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { - /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ - //vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); - //return -1; - //} + if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { +// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ + vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; + } // commit if need if (needCommit) { -- GitLab