From 9ca77572d98ae5fa9c65718434186d20591a6049 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Mar 2023 14:09:42 +0800 Subject: [PATCH] fix(tmq): fix race condition. --- source/client/src/clientTmq.c | 197 ++++++++++++++++++++-------------- 1 file changed, 114 insertions(+), 83 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 6a9f88ce15..3748142b0d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -106,17 +106,13 @@ struct tmq_t { tmr_h reportTimer; tmr_h commitTimer; - // connection - STscObj* pTscObj; - - // container - SArray* clientTopics; // SArray - STaosQueue* mqueue; // queue of rsp - STaosQall* qall; - STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit - - // ctl - tsem_t rspSem; + STscObj* pTscObj; // connection + SArray* clientTopics; // SArray + STaosQueue* mqueue; // queue of rsp + STaosQall* qall; + STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit + TdThreadMutex lock; // used to protect the operation on each topic, when updating the epsets. + tsem_t rspSem; }; enum { @@ -213,9 +209,9 @@ typedef struct { typedef struct { SMqCommitCbParamSet* params; STqOffset* pOffset; - SMqClientVg* pMqVg; - /*char topicName[TSDB_TOPIC_FNAME_LEN];*/ - /*int32_t vgId;*/ + char topicName[TSDB_TOPIC_FNAME_LEN]; + int32_t vgId; + tmq_t* pTmq; } SMqCommitCbParam; static int32_t tmqAskEp(tmq_t* tmq, bool async); @@ -431,6 +427,7 @@ static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) { int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; + // push into array #if 0 if (code == 0) { @@ -440,15 +437,34 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { } #endif - // there may be race condition. fix it - if (pBuf->pEpSet != NULL && pParam->pMqVg != NULL) { - SMqClientVg* pMqVg = pParam->pMqVg; + if (pBuf->pEpSet != NULL) { + // todo extract method + taosThreadMutexLock(&pParam->pTmq->lock); + + int32_t numOfTopics = taosArrayGetSize(pParam->pTmq->clientTopics); + for(int32_t i = 0; i < numOfTopics; ++i) { + SMqClientTopic* pTopic = taosArrayGet(pParam->pTmq->clientTopics, i); + if (strcmp(pTopic->topicName, pParam->topicName) != 0) { + continue; + } + + int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); + for(int32_t j = 0; j < numOfVgs; ++j) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); + if (pClientVg->vgId == pParam->vgId) { + SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet); + SEp* pOld = GET_ACTIVE_EP(&(pClientVg->epSet)); + uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pParam->vgId, + pEp->fqdn, pEp->port, pOld->fqdn, pOld->port); + pClientVg->epSet = *pBuf->pEpSet; + break; + } + } + + break; + } - SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet); - SEp* pOld = GET_ACTIVE_EP(&(pMqVg->epSet)); - uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pMqVg->vgId, - pEp->fqdn, pEp->port, pOld->fqdn, pOld->port); - pParam->pMqVg->epSet = *pBuf->pEpSet; + taosThreadMutexUnlock(&pParam->pTmq->lock); } taosMemoryFree(pParam->pOffset); @@ -508,7 +524,10 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT pParam->params = pParamSet; pParam->pOffset = pOffset; - pParam->pMqVg = pVg; // there may be an race condition + pParam->vgId = pVg->vgId; + pParam->pTmq = tmq; + + tstrncpy(pParam->topicName, pTopic->topicName, tListLen(pParam->topicName)); // build send info SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -659,6 +678,7 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, for (int32_t i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + // todo race condition: fix it int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); @@ -944,10 +964,14 @@ void tmqFreeImpl(void* handle) { tmqClearUnhandleMsg(tmq); taosCloseQueue(tmq->mqueue); } - if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask); - taosFreeQall(tmq->qall); + if (tmq->delayedTask) { + taosCloseQueue(tmq->delayedTask); + } + + taosFreeQall(tmq->qall); tsem_destroy(&tmq->rspSem); + taosThreadMutexDestroy(&tmq->lock); int32_t sz = taosArrayGetSize(tmq->clientTopics); for (int32_t i = 0; i < sz; i++) { @@ -955,6 +979,7 @@ void tmqFreeImpl(void* handle) { taosMemoryFreeClear(pTopic->schema.pSchema); taosArrayDestroy(pTopic->vgs); } + taosArrayDestroy(tmq->clientTopics); taos_close_internal(tmq->pTscObj); taosMemoryFree(tmq); @@ -993,9 +1018,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); pTmq->mqueue = taosOpenQueue(); - pTmq->qall = taosAllocateQall(); pTmq->delayedTask = taosOpenQueue(); + pTmq->qall = taosAllocateQall(); + taosThreadMutexInit(&pTmq->lock, NULL); if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || conf->groupId[0] == 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1193,7 +1219,6 @@ FAIL: } void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) { - // conf->commitCb = cb; conf->commitCbUserParam = param; } @@ -1328,7 +1353,53 @@ CREATE_MSG_FAIL: return -1; } -bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { +static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap, + tmq_t* tmq) { + pTopic->schema = pTopicEp->schema; + pTopicEp->schema.nCols = 0; + pTopicEp->schema.pSchema = NULL; + + char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); + + tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); + tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN); + + tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); + pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); + + for (int32_t j = 0; j < vgNumGet; j++) { + SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + sprintf(vgKey, "%s:%d", pTopic->topicName, pVgEp->vgId); + STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey)); + STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg}; + if (pOffset != NULL) { + offsetNew = *pOffset; + } + + SMqClientVg clientVg = { + .pollCnt = 0, + .currentOffset = offsetNew, + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet, + .vgStatus = TMQ_VG_STATUS__IDLE, + .vgSkipCnt = 0, + }; + + taosArrayPush(pTopic->vgs, &clientVg); + } +} + +static void freeClientVgInfo(void* param) { + SMqClientTopic* pTopic = param; + if (pTopic->schema.nCols) { + taosMemoryFreeClear(pTopic->schema.pSchema); + } + + taosArrayDestroy(pTopic->vgs); +} + +static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { bool set = false; int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); @@ -1343,12 +1414,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { return false; } - SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); - if (pHash == NULL) { + SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); + if (pVgOffsetHashMap == NULL) { taosArrayDestroy(newTopics); return false; } + // todo extract method for (int32_t i = 0; i < topicNumCur; i++) { // find old topic SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); @@ -1362,7 +1434,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { tFormatOffset(buf, 80, &pVgCur->currentOffset); tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal)); + taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal)); } } } @@ -1370,62 +1442,25 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { for (int32_t i = 0; i < topicNumGet; i++) { SMqClientTopic topic = {0}; SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); - topic.schema = pTopicEp->schema; - pTopicEp->schema.nCols = 0; - pTopicEp->schema.pSchema = NULL; - tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); - tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN); - - tscDebug("consumer:0x%" PRIx64 ", update topic: %s", tmq->consumerId, topic.topicName); - - int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); - topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); - - for (int32_t j = 0; j < vgNumGet; j++) { - SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); - sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); - STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); - STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg}; - if (pOffset != NULL) { - offsetNew = *pOffset; - } - - SMqClientVg clientVg = { - .pollCnt = 0, - .currentOffset = offsetNew, - .vgId = pVgEp->vgId, - .epSet = pVgEp->epSet, - .vgStatus = TMQ_VG_STATUS__IDLE, - .vgSkipCnt = 0, - }; - taosArrayPush(topic.vgs, &clientVg); - set = true; - } + initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq); taosArrayPush(newTopics, &topic); } // destroy current buffered existed topics info if (tmq->clientTopics) { - int32_t sz = taosArrayGetSize(tmq->clientTopics); - for (int32_t i = 0; i < sz; i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema); - taosArrayDestroy(pTopic->vgs); - } - - taosArrayDestroy(tmq->clientTopics); + taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo); } - taosHashCleanup(pHash); - tmq->clientTopics = newTopics; + taosHashCleanup(pVgOffsetHashMap); - if (taosArrayGetSize(tmq->clientTopics) == 0) { - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); - } else { - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); - } + taosThreadMutexLock(&tmq->lock); + tmq->clientTopics = newTopics; + taosThreadMutexUnlock(&tmq->lock); + int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY; + atomic_store_8(&tmq->status, flag); atomic_store_32(&tmq->epoch, epoch); + tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); return set; } @@ -1448,7 +1483,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { } pParam->code = code; - if (code != 0) { + if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async, tstrerror(code)); goto END; @@ -1471,8 +1506,6 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (!async) { SMqAskEpRsp rsp; tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); - /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/ - /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/ tmqUpdateEp(tmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } else { @@ -1493,7 +1526,6 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { } END: - /*atomic_store_8(&tmq->epStatus, 0);*/ if (!async) { tsem_post(&pParam->rspSem); } else { @@ -1545,7 +1577,6 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { if (pParam == NULL) { tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId); taosMemoryFree(pReq); - /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } @@ -1559,7 +1590,6 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); taosMemoryFree(pReq); - /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } @@ -1995,6 +2025,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { tmq_list_destroy(lst); } + taosRemoveRef(tmqMgmt.rsetId, tmq->refId); return 0; } -- GitLab