From 68e0ddfa8831905c1546600718b5178b2776c7d2 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 17 Oct 2022 16:28:55 +0800 Subject: [PATCH] refactor: tmq client --- include/common/tmsg.h | 2 +- source/client/src/clientTmq.c | 44 +++++++++++++++++++---------------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9f21ee007f..5e2ca8896a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1902,7 +1902,7 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) { if (pRebInfo == NULL) { return NULL; } - strcpy(pRebInfo->key, key); + tstrncpy(pRebInfo->key, key, TSDB_SUBSCRIBE_KEY_LEN); pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t)); if (pRebInfo->lostConsumers == NULL) { goto _err; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 51005e3c30..8f94d88b1c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -233,12 +233,12 @@ void tmq_conf_destroy(tmq_conf_t* conf) { tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { if (strcmp(key, "group.id") == 0) { - strcpy(conf->groupId, value); + tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN); return TMQ_CONF_OK; } if (strcmp(key, "client.id") == 0) { - strcpy(conf->clientId, value); + tstrncpy(conf->clientId, value, 256); return TMQ_CONF_OK; } @@ -452,7 +452,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT int32_t code; tEncodeSize(tEncodeSTqOffset, pOffset, len, code); if (code < 0) { - ASSERT(0); return -1; } void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); @@ -464,15 +463,22 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT SEncoder encoder; tEncoderInit(&encoder, abuf, len); tEncodeSTqOffset(&encoder, pOffset); + tEncoderClear(&encoder); // build param SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam)); + if (pParam == NULL) { + taosMemoryFree(buf); + return -1; + } pParam->params = pParamSet; pParam->pOffset = pOffset; // build send info SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (pMsgSendInfo == NULL) { + taosMemoryFree(buf); + taosMemoryFree(pParam); return -1; } pMsgSendInfo->msgInfo = (SDataBuf){ @@ -547,6 +553,8 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { + tsem_destroy(&pParamSet->rspSem); + taosMemoryFree(pParamSet); goto FAIL; } goto HANDLE_RSP; @@ -565,6 +573,7 @@ HANDLE_RSP: tsem_wait(&pParamSet->rspSem); code = pParamSet->rspErr; tsem_destroy(&pParamSet->rspSem); + taosMemoryFree(pParamSet); return code; } else { code = 0; @@ -587,7 +596,14 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + if (async) { + if (automatic) { + tmq->commitCb(tmq, code, tmq->commitCbUserParam); + } else { + userCb(tmq, code, userParam); + } + } return -1; } @@ -642,16 +658,6 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t code = pParamSet->rspErr; tsem_destroy(&pParamSet->rspSem); taosMemoryFree(pParamSet); - } else { - code = 0; - } - - if (code != 0 && async) { - if (automatic) { - tmq->commitCb(tmq, code, tmq->commitCbUserParam); - } else { - userCb(tmq, code, userParam); - } } #if 0 @@ -722,6 +728,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { taosMemoryFree(pReq); + goto OVER; } sendInfo->msgInfo = (SDataBuf){ .pData = pReq, @@ -871,8 +878,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); if (pTmq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), - pTmq->groupId); + tscError("setting up new consumer failed since %s, consumer group %s", terrstr(), conf->groupId); return NULL; } @@ -1061,9 +1067,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { code = 0; FAIL: if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree); - if (code != 0 && buf) { - taosMemoryFree(buf); - } + taosMemoryFree(buf); + return code; } @@ -1101,7 +1106,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { - taosMemoryFree(pMsg->pData); tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch); goto CREATE_MSG_FAIL; } -- GitLab