From f2f6fb494c90b1f13add57642013498c80f2be19 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 26 Apr 2022 14:48:24 +0800 Subject: [PATCH] remove ep status and cnt --- source/client/src/tmq.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a9acb781be..66e10f8c6a 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -83,9 +83,11 @@ struct tmq_t { // status int8_t status; - int8_t epStatus; int32_t epoch; +#if 0 + int8_t epStatus; int32_t epSkipCnt; +#endif int64_t pollCnt; // timer @@ -464,8 +466,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; - pTmq->epStatus = 0; - pTmq->epSkipCnt = 0; + /*pTmq->epStatus = 0;*/ + /*pTmq->epSkipCnt = 0;*/ // set conf strcpy(pTmq->clientId, conf->clientId); @@ -975,7 +977,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } END: - atomic_store_8(&tmq->epStatus, 0); + /*atomic_store_8(&tmq->epStatus, 0);*/ if (pParam->sync) { tsem_post(&pParam->rspSem); } @@ -984,6 +986,7 @@ END: int32_t tmqAskEp(tmq_t* tmq, bool sync) { int32_t code = 0; +#if 0 int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); if (epStatus == 1) { int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); @@ -991,11 +994,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { if (epSkipCnt < 5000) return 0; } atomic_store_32(&tmq->epSkipCnt, 0); +#endif int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen); if (req == NULL) { tscError("failed to malloc get subscribe ep buf"); - atomic_store_8(&tmq->epStatus, 0); + /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } req->consumerId = htobe64(tmq->consumerId); @@ -1006,7 +1010,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { if (pParam == NULL) { tscError("failed to malloc subscribe param"); taosMemoryFree(req); - atomic_store_8(&tmq->epStatus, 0); + /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } pParam->tmq = tmq; @@ -1018,7 +1022,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); taosMemoryFree(req); - atomic_store_8(&tmq->epStatus, 0); + /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } -- GitLab