From 2f6c56539242fba80f24ce896cbf9f7600fc9f61 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 28 Jul 2023 11:48:53 +0800 Subject: [PATCH] fix:pHandle not load if taosd restart --- source/dnode/mnode/impl/src/mndConsumer.c | 4 ++-- source/dnode/vnode/src/tq/tq.c | 16 ++++++++++++---- source/dnode/vnode/src/tq/tqMeta.c | 6 +++--- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6081b9a530..f9673b67cd 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ bool mndRebTryStart() { int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); - mInfo("tq timer, rebalance counter old val:%d", old); + mDebug("tq timer, rebalance counter old val:%d", old); return old == 0; } @@ -116,7 +116,7 @@ void mndRebCntDec() { int32_t newVal = val - 1; int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); if (oldVal == val) { - mInfo("rebalance trans end, rebalance counter:%d", newVal); + mDebug("rebalance trans end, rebalance counter:%d", newVal); break; } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 89ed3ca1c7..d543e2afbd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -522,10 +522,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // 1. find handle pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { - tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); - terrno = TSDB_CODE_INVALID_MSG; - taosWUnLockLatch(&pTq->lock); - return -1; + do{ + if (tqMetaGetHandle(pTq, req.subKey) == 0){ + pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + if(pHandle != NULL){ + break; + } + } + tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); + terrno = TSDB_CODE_INVALID_MSG; + taosWUnLockLatch(&pTq->lock); + return -1; + }while(0); } // 2. check re-balance status diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index df1c9ca7c9..08019f8a76 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -338,7 +338,7 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ taosArrayDestroy(tbUidList); return -1; } - tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid); + tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid); handle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL); taosArrayDestroy(tbUidList); @@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ if(buildHandle(pTq, handle) < 0){ return -1; } - tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } @@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ if(buildHandle(pTq, handle) < 0){ return -1; } - tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } -- GitLab