From 7510a0e7a5e68c0a36a82176b5d42f2f8cd2465c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 4 Mar 2022 18:00:26 +0800 Subject: [PATCH] remove createRequest in poll --- source/client/src/tmq.c | 115 ++++++++++++++++++++++------------------ 1 file changed, 64 insertions(+), 51 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f3bafd73fc..35fc557eea 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -202,6 +202,26 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) { return 0; } +void tmqClearUnhandleMsg(tmq_t* tmq) { + tmq_message_t* msg; + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } + + taosReadAllQitems(tmq->mqueue, tmq->qall); + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } +} + int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; @@ -845,6 +865,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j); if (pVg->vgId == pOffset->vgId) { pVg->currentOffset = pOffset->offset; + tmqClearUnhandleMsg(tmq); return TMQ_RESP_ERR__SUCCESS; } } @@ -883,26 +904,6 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClien return pReq; } -void tmqClearUnhandleMsg(tmq_t* tmq) { - tmq_message_t* msg; - while (1) { - taosGetQitem(tmq->qall, (void**)&msg); - if (msg) - taosFreeQitem(msg); - else - break; - } - - taosReadAllQitems(tmq->mqueue, tmq->qall); - while (1) { - taosGetQitem(tmq->qall, (void**)&msg); - if (msg) - taosFreeQitem(msg); - else - break; - } -} - tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { tmq_message_t* msg = NULL; for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { @@ -919,29 +920,35 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { // TODO: out of mem return NULL; } - SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); - if (param == NULL) { + + SMqPollCbParam* pParam = malloc(sizeof(SMqPollCbParam)); + if (pParam == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // TODO: out of mem return NULL; } - param->tmq = tmq; - param->pVg = pVg; - param->epoch = tmq->epoch; - param->sync = 1; - param->msg = &msg; - tsem_init(¶m->rspSem, 0, 0); - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){ + pParam->tmq = tmq; + pParam->pVg = pVg; + pParam->epoch = tmq->epoch; + pParam->sync = 1; + pParam->msg = &msg; + tsem_init(&pParam->rspSem, 0, 0); + + SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + return NULL; + } + + sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL, }; - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; - sendInfo->param = param; + sendInfo->param = pParam; sendInfo->fp = tmqPollCb; + sendInfo->msgType = TDMT_VND_CONSUME; int64_t transporterId = 0; /*printf("send poll\n");*/ @@ -950,7 +957,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { pVg->pollCnt++; tmq->pollCnt++; - tsem_wait(¶m->rspSem); + tsem_wait(&pParam->rspSem); tmq_message_t* nmsg = NULL; while (1) { taosReadQitem(tmq->mqueue, (void**)&nmsg); @@ -978,32 +985,40 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - // TODO: out of mem tsem_post(&tmq->rspSem); return -1; } - SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); - if (param == NULL) { + SMqPollCbParam* pParam = malloc(sizeof(SMqPollCbParam)); + if (pParam == NULL) { + free(pReq); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - // TODO: out of mem tsem_post(&tmq->rspSem); return -1; } - param->tmq = tmq; - param->pVg = pVg; - param->epoch = tmq->epoch; - param->sync = 0; - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){ + pParam->tmq = tmq; + pParam->pVg = pVg; + pParam->epoch = tmq->epoch; + pParam->sync = 0; + + SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + free(pReq); + free(pParam); + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + tsem_post(&tmq->rspSem); + return -1; + } + + sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL, }; - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; - sendInfo->param = param; + sendInfo->param = pParam; sendInfo->fp = tmqPollCb; + sendInfo->msgType = TDMT_VND_CONSUME; int64_t transporterId = 0; /*printf("send poll\n");*/ @@ -1053,7 +1068,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); return rspMsg; } else { - printf("epoch mismatch\n"); + /*printf("epoch mismatch\n");*/ taosFreeQitem(rspMsg); } } else { @@ -1107,9 +1122,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { while (1) { /*printf("cycle\n");*/ - if (atomic_load_32(&tmq->waitingRequest) == 0) { - tmqPollImpl(tmq, blocking_time); - } + tmqPollImpl(tmq, blocking_time); tsem_wait(&tmq->rspSem); -- GitLab