From 8572f4a32fa6db44b563d01be8c0ce3aeb271af3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 24 Apr 2023 19:41:30 +0800 Subject: [PATCH] opti:change push mgr to consume msg for subscribe --- source/client/src/clientTmq.c | 4 ++-- source/common/src/tglobal.c | 8 ++++---- source/common/src/tmsg.c | 6 +++--- source/dnode/vnode/src/tq/tq.c | 12 +++++++----- source/dnode/vnode/src/tq/tqUtil.c | 7 ++++++- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 16a4f55840..f05a314e44 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1702,7 +1702,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms - tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, + tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } @@ -1710,7 +1710,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, + tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; #if 0 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index aa35b298e6..8cd3d7f5ab 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1274,10 +1274,10 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32, false); if (taosMulModeMkDir(tsLogDir, 0777) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - printf("failed to create dir:%s since %s", tsLogDir, terrstr()); - cfgCleanup(pCfg); - return -1; +// terrno = TAOS_SYSTEM_ERROR(errno); +// printf("failed to create dir:%s since %s", tsLogDir, terrstr()); +// cfgCleanup(pCfg); +// return -1; } if (taosInitLog(logname, logFileNum) != 0) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d9802244b7..cd980d028c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5328,9 +5328,9 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); - SMsgHead *pHead = buf; - pHead->vgId = pReq->head.vgId; - pHead->contLen = pReq->head.contLen; +// SMsgHead *pHead = buf; +// pHead->vgId = pReq->head.vgId; +// pHead->contLen = pReq->head.contLen; SDecoder decoder = {0}; tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ae4a7e1d61..73c7075d51 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1084,13 +1084,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { taosWLockLatch(&pTq->lock); for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){ STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i); - if(pHandle->msg == NULL){ + if(ASSERT(pHandle->msg != NULL)){ tqError("pHandle->msg should not be null"); + break; + }else{ + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; } - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen}; - tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; } taosArrayClear(pTq->pPushArray); // unlock diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f76e641f2b..3f92414c34 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -181,15 +181,20 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); // lock taosWLockLatch(&pTq->lock); - if(pHandle->msg != NULL){ + if(ASSERT(pHandle->msg == NULL)){ tqError("pHandle->msg should be null"); + taosWUnLockLatch(&pTq->lock); + goto end; } pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); + pHandle->msg->contLen = pMsg->contLen; + tqError("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen); taosArrayPush(pTq->pPushArray, &pHandle); taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); return code; } -- GitLab