diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 2f3d60001944365451e312baf48097169f4587bf..82d29b37ebfbd2c57d3fa4d4afe2533b2d3a048c 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -925,7 +925,7 @@ TEST(clientCase, subscription_test) { // 创建订阅 topics 列表 tmq_list_t* topicList = tmq_list_new(); -// tmq_list_append(topicList, "topic_t1"); + tmq_list_append(topicList, "topic_t1"); // 启动订阅 tmq_subscribe(tmq, topicList); @@ -954,7 +954,7 @@ TEST(clientCase, subscription_test) { printf("db: %s\n", dbName); printf("vgroup id: %d\n", vgroupId); - if (count ++ > 20) { + if (count ++ > 200) { tmq_unsubscribe(tmq); break; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4d19110f31ffa9e01a38c4bd60ab375e608a65d4..924216bcbf87b312221ca1782c3e56e09b951fde 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -224,7 +224,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, sub, pVgEp->vgId, consumerId); + mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, sub, pVgEp->vgId, consumerId); } taosArrayDestroy(pConsumerEp->vgs); taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); @@ -329,7 +329,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, sub, consumerId); + mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, sub, consumerId); } } @@ -357,7 +357,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId, + mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } @@ -387,12 +387,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { - mInfo("mq rebalance: skip vg %d for same consumer:%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId, + mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); continue; } taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId, + mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } else { @@ -1019,7 +1019,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); - mDebug("mnd show subscriptions: topic %s, consumer:%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), + mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId); // offset