diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 33324552dce24137d2a778d0b33f4b529cbbb4a3..0a31eac09cde3f724a20bba52b0442b59c7bda98 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1790,8 +1790,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } - // update the local offset value only for the returned values. - pVg->currentOffset = pDataRsp->rspOffset; + if(pDataRsp->rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pDataRsp->rspOffset; // update the local offset value only for the returned values. + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); char buf[80]; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cbf856a79930d40096bcf36271bb46b7ee86fb91..2639b81c39ec77fe1a5414cc5960ed021d8c301c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6839,10 +6839,8 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1; - } else if (pOffsetVal->type < 0) { - // do nothing } else { - ASSERT(0); + // do nothing } return 0; } @@ -6854,10 +6852,8 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1; - } else if (pOffsetVal->type < 0) { - // do nothing } else { - ASSERT(0); + // do nothing } return 0; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 1b29b340735fbaa32d75e72284d9a5c4909936fe..30b2fb74ca3251184b3d116db81046a3a9d81919 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -114,7 +114,7 @@ struct STQ { char* path; int64_t walLogLastVer; SRWLatch lock; - SHashObj* pPushMgr; // consumerId -> STqHandle + SHashObj* pPushMgr; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 53a40eb8390fd05010cf9d808827ed46696c86f8..bd29c8019debf9b458b8852397e54375519b4a43 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -551,13 +551,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosWLockLatch(&pTq->lock); - atomic_store_32(&pHandle->epoch, -1); + atomic_store_32(&pHandle->epoch, 0); // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_add_fetch_32(&pHandle->epoch, 1); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { qStreamCloseTsdbReader(pTaskInfo);