From 22a85734c69528e977663a2472b51e4b17cac079 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 5 Mar 2023 13:23:50 +0800 Subject: [PATCH] fix(mq): add more chek for balance couner to avoid the negative value emerges. --- include/util/taoserror.h | 1 - source/client/src/clientTmq.c | 7 +------ source/client/test/clientTests.cpp | 9 +++++++- source/dnode/mnode/impl/src/mndConsumer.c | 24 ++++++++++++++++------ source/dnode/mnode/impl/src/mndMain.c | 3 +++ source/dnode/mnode/impl/src/mndSubscribe.c | 6 +++--- source/dnode/vnode/src/tq/tqRead.c | 9 ++++---- source/libs/executor/src/scanoperator.c | 6 +++++- source/util/src/terror.c | 1 - 9 files changed, 42 insertions(+), 24 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 61e181bc36..5106196ccd 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -558,7 +558,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B) #define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0C) #define TSDB_CODE_TQ_NO_COMMITTED_OFFSET TAOS_DEF_ERROR_CODE(0, 0x0A0D) -#define TSDB_CODE_TQ_NO_SUBSCRIBE_TOPICS TAOS_DEF_ERROR_CODE(0, 0x0A0E) // wal // #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) // 2.x diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 00e29f5ecc..d08cabd27e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1076,12 +1076,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = 0; - if (sz == 0) { - tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics, not allowed", tmq->consumerId, tmq->groupId, sz); - return TSDB_CODE_TQ_NO_SUBSCRIBE_TOPICS; - } else { - tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); - } + tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 59c931d9aa..2f3d600019 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -925,7 +925,7 @@ TEST(clientCase, subscription_test) { // 创建订阅 topics 列表 tmq_list_t* topicList = tmq_list_new(); - tmq_list_append(topicList, "topic_t1"); +// tmq_list_append(topicList, "topic_t1"); // 启动订阅 tmq_subscribe(tmq, topicList); @@ -938,6 +938,8 @@ TEST(clientCase, subscription_test) { int32_t msgCnt = 0; int32_t timeout = 5000; + int32_t count = 0; + while (1) { TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); if (pRes) { @@ -952,6 +954,11 @@ TEST(clientCase, subscription_test) { printf("db: %s\n", dbName); printf("vgroup id: %d\n", vgroupId); + if (count ++ > 20) { + tmq_unsubscribe(tmq); + break; + } + while (1) { TAOS_ROW row = taos_fetch_row(pRes); if (row == NULL) break; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 90f5f8c839..e52b046053 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -82,18 +82,29 @@ bool mndRebTryStart() { } void mndRebEnd() { - int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); - mInfo("rebalance end, rebalance count:%d", val); + mndRebCntDec(); } void mndRebCntInc() { int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1); - mInfo("rebalance trans start, rebalance count:%d", val); + mInfo("rebalance trans start, rebalance counter:%d", val); } void mndRebCntDec() { - int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); - mInfo("rebalance trans end, rebalance count:%d", val); + while (1) { + int32_t val = atomic_load_32(&mqRebInExecCnt); + if (val <= 0) { + mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val); + break; + } + + int32_t newVal = val - 1; + int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); + if (oldVal == val) { + mInfo("rebalance trans end, rebalance counter:%d", newVal); + break; + } + } } static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { @@ -308,6 +319,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { taosRUnLockLatch(&pConsumer->lock); } else if (status == MQ_CONSUMER_STATUS__MODIFY) { taosRLockLatch(&pConsumer->lock); + int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics); for (int32_t i = 0; i < newTopicNum; i++) { char key[TSDB_SUBSCRIBE_KEY_LEN]; @@ -700,6 +712,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { // no topics need to be rebalanced if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { +// mInfo(); goto _over; } @@ -1057,7 +1070,6 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * } taosRLockLatch(&pConsumer->lock); - mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId); int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index b09a4f63a7..d83b969e2d 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -706,6 +706,9 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { } else if (code == 0) { mGTrace("msg:%p, successfully processed", pMsg); } else { + if (code == -1) { + code = terrno; + } mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, tstrerror(code), pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 648014d97e..86f6976398 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -263,7 +263,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR imbConsumerNum = totalVgNum % afterRebConsumerNum; } - mInfo("sub:%s mq re-balance %d consumers: at least %d vg each, %d consumer has more vg", sub, + mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has more vgs", sub, afterRebConsumerNum, minVgCnt, imbConsumerNum); // 4. first scan: remove consumer more than wanted, put to remove hash @@ -591,13 +591,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key); if (rebOutput.pSub == NULL) { - mError("mq rebalance %s failed create sub since %s, abort", pRebInfo->key, terrstr()); + mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr()); taosRUnLockLatch(&pTopic->lock); mndReleaseTopic(pMnode, pTopic); continue; } - memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); + memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); taosRUnLockLatch(&pTopic->lock); mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index f4c96d073b..8712a93bff 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -297,7 +297,7 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { tqError("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id); return -1; } else { - tqError("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id); + tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id); return 0; } } @@ -308,13 +308,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (1) { if (!fromProcessedMsg) { if (walNextValidMsg(pReader->pWalReader) < 0) { - pReader->ver = - pReader->pWalReader->curVersion - pReader->pWalReader->curStopped; -// pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped); + pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curStopped; ret->offset.type = TMQ_OFFSET__LOG; + ret->offset.version = pReader->ver; ret->fetchType = FETCH_TYPE__NONE; - tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); + tqDebug("return offset %" PRId64 ", no more valid msg in wal", ret->offset.version); return -1; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8576ac74c7..ccc721996f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1632,8 +1632,12 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { while (1) { SFetchRet ret = {0}; if (tqNextBlock(pInfo->tqReader, &ret) < 0) { - qError("failed to get next log block since %s", terrstr()); + // if the end is reached, terrno is 0 + if (terrno != 0) { + qError("failed to get next log block since %s", terrstr()); + } } + if (ret.fetchType == FETCH_TYPE__DATA) { blockDataCleanup(pInfo->pRes); setBlockIntoRes(pInfo, &ret.data, true); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6e63776afe..33b562c8dd 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -436,7 +436,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_META_KEY_DUP_IN_TXN, "TQ met key dup in txn TAOS_DEFINE_ERROR(TSDB_CODE_TQ_GROUP_NOT_SET, "TQ group not exist") TAOS_DEFINE_ERROR(TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND, "TQ table schema not found") TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no committed offset") -TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_SUBSCRIBE_TOPICS, "TQ no topics") // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") -- GitLab