From 9ebdee326f2d53c76056ae1d4a5bcc474646ba12 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 1 Apr 2022 18:13:46 +0800 Subject: [PATCH] fix txn --- source/client/src/tmq.c | 29 ++++++++++++++-------- source/dnode/mnode/impl/src/mndConsumer.c | 1 + source/dnode/mnode/impl/src/mndSubscribe.c | 25 ++++++++----------- source/dnode/vnode/src/tq/tq.c | 11 ++++---- source/libs/catalog/src/catalog.c | 4 +-- tests/script/tsim/tmq/basic1.sim | 2 +- 6 files changed, 37 insertions(+), 35 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 3e4b25c9e7..f17df3e6f1 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -78,6 +78,7 @@ struct tmq_t { STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; + int8_t epStatus; int32_t waitingRequest; int32_t readyRequest; SArray* clientTopics; // SArray @@ -311,6 +312,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->epoch = 0; pTmq->waitingRequest = 0; pTmq->readyRequest = 0; + pTmq->epStatus = 0; // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -833,7 +835,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { tmq_t* tmq = pParam->tmq; if (code != 0) { tscWarn("msg discard, code:%x", code); - goto WRITE_QUEUE_FAIL; + goto CREATE_MSG_FAIL; } int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; @@ -873,7 +875,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { /*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/ tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { - goto WRITE_QUEUE_FAIL; + goto CREATE_MSG_FAIL; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg); @@ -886,7 +888,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (pRsp->msg.numOfTopics == 0) { /*printf("no data\n");*/ taosFreeQitem(pRsp); - goto WRITE_QUEUE_FAIL; + goto CREATE_MSG_FAIL; } #endif @@ -899,7 +901,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { tsem_post(&tmq->rspSem); return 0; -WRITE_QUEUE_FAIL: +CREATE_MSG_FAIL: if (pParam->epoch == tmq->epoch) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } @@ -940,7 +942,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { for (int32_t k = 0; k < vgNumCur; k++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k); sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId); - printf("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey); + tscDebug("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); } break; @@ -954,12 +956,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); int64_t offset = pVgEp->offset; - printf("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset); + tscDebug("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset); if (pOffset != NULL) { offset = *pOffset; - printf("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey); + tscDebug("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey); } - printf("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset); + tscDebug("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset); SMqClientVg clientVg = { .pollCnt = 0, .currentOffset = offset, @@ -1020,6 +1022,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } END: + atomic_store_8(&tmq->epStatus, 0); if (pParam->sync) { tsem_post(&pParam->rspSem); } @@ -1027,6 +1030,10 @@ END: } int32_t tmqAskEp(tmq_t* tmq, bool sync) { + int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); + if (epStatus == 1) { + return 0; + } int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen); if (req == NULL) { @@ -1207,7 +1214,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus != TMQ_VG_STATUS__IDLE) { - /*printf("skip vg %d\n", pVg->vgId);*/ + tscDebug("skip vg %d", pVg->vgId); continue; } SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); @@ -1251,7 +1258,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int64_t transporterId = 0; /*printf("send poll\n");*/ atomic_add_fetch_32(&tmq->waitingRequest, 1); - /*tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset);*/ + tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; @@ -1315,7 +1322,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese tmqHandleNoPollRsp(tmq, rspHead, &reset); taosFreeQitem(rspHead); if (pollIfReset && reset) { - printf("reset and repoll\n"); + tscDebug("reset and repoll\n"); tmqPollImpl(tmq, blockingTime); } } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 8eceea3d06..c4c93b7fc9 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -160,6 +160,7 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId); + pOldConsumer->epoch++; // TODO handle update /*taosWLockLatch(&pOldConsumer->lock);*/ diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 195afa3848..1afbb74067 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -446,26 +446,21 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) { - SMqConsumerObj* pNewRebConsumer = taosMemoryMalloc(sizeof(SMqConsumerObj)); - ASSERT(pNewRebConsumer); - memcpy(pNewRebConsumer, pRebConsumer, sizeof(SMqConsumerObj)); - pNewRebConsumer->currentTopics = taosArrayDup(pRebConsumer->currentTopics); - pNewRebConsumer->recentRemovedTopics = taosArrayDup(pRebConsumer->recentRemovedTopics); - if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) { - pNewRebConsumer->epoch++; - } + /*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/ + /*pRebConsumer->epoch++;*/ + /*}*/ if (vgThisConsumerAfterRb != 0) { - atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); + atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); } else { - atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); + atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); } - mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pNewRebConsumer->consumerId, status, - pNewRebConsumer->status); + mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status, + pRebConsumer->status); - SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pNewRebConsumer); + SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); - mndTransAppendRedolog(pTrans, pConsumerRaw); + mndTransAppendCommitlog(pTrans, pConsumerRaw); } mndReleaseConsumer(pMnode, pRebConsumer); } @@ -512,7 +507,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { // TODO: log rebalance statistics SSdbRaw *pSubRaw = mndSubActionEncode(pSub); - sdbSetRawStatus(pSubRaw, SDB_STATUS_READY); + sdbSetRawStatus(pSubRaw, SDB_STATUS_UPDATING); mndTransAppendRedolog(pTrans, pSubRaw); } mndReleaseSubscribe(pMnode, pSub); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 438a584842..68ade46efd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -264,7 +264,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffset = pReq->currentOffset + 1; } - printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + /*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/ SMqPollRsp rsp = { /*.consumerId = consumerId,*/ @@ -299,8 +299,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // response to user break; } - printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, - pReq->epoch); + /*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/ /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -353,7 +352,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch); + /*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/ tmsgSendRsp(pMsg); taosMemoryFree(pHead); return 0; @@ -384,7 +383,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch); + /*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/ /*}*/ return 0; @@ -451,7 +450,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); ASSERT(pTopic->buffer.output[i].task); } - printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId); + /*printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);*/ taosArrayPush(pConsumer->topics, pTopic); tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); tqHandleCommit(pTq->tqMeta, req.consumerId); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index e87fdba71d..a3019d8d56 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -2304,8 +2304,6 @@ int32_t catalogInit(SCatalogCfg *cfg) { CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - CTG_ERR_RET(ctgStartUpdateThread()); - tsem_init(&gCtgMgmt.queue.reqSem, 0, 0); tsem_init(&gCtgMgmt.queue.rspSem, 0, 0); @@ -2316,6 +2314,8 @@ int32_t catalogInit(SCatalogCfg *cfg) { } gCtgMgmt.queue.tail = gCtgMgmt.queue.head; + CTG_ERR_RET(ctgStartUpdateThread()); + qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec); return TSDB_CODE_SUCCESS; diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index afd3f0f9b0..cfcb1ac992 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -35,7 +35,7 @@ sql connect $dbNamme = d0 print =============== create database , vgroup 1 -sql create database $dbNamme vgroups 10 +sql create database $dbNamme vgroups 1 sql show databases print $data00 $data01 $data02 if $rows != 2 then -- GitLab