tqDebug("tmq poll: consumer:0x%"PRIx64"vgId:%d, topic:%s, set handle exec, pHandle:%p",consumerId,vgId,req.subKey,pHandle);
tqDebug("tmq poll: consumer:0x%"PRIx64"vgId:%d, topic:%s, set handle exec, pHandle:%p",consumerId,vgId,
req.subKey,pHandle);
taosWUnLockLatch(&pTq->lock);
taosWUnLockLatch(&pTq->lock);
break;
break;
}
}
taosWUnLockLatch(&pTq->lock);
taosWUnLockLatch(&pTq->lock);
tqDebug("tmq poll: consumer:0x%"PRIx64"vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",consumerId,vgId,req.subKey,pHandle);
tqDebug("tmq poll: consumer:0x%"PRIx64
"vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
consumerId,vgId,req.subKey,pHandle);
taosMsleep(10);
taosMsleep(10);
}
}
// 3. update the epoch value
// 3. update the epoch value
if(pHandle->epoch<reqEpoch){
if(pHandle->epoch<reqEpoch){
tqDebug("tmq poll: consumer:0x%"PRIx64" epoch update from %d to %d by poll req",consumerId,pHandle->epoch,reqEpoch);
tqDebug("tmq poll: consumer:0x%"PRIx64" epoch update from %d to %d by poll req",consumerId,pHandle->epoch,
tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%"PRIx64" suid:%"PRId64,pVnode->config.vgId,req.subKey,pHandle->consumerId,req.suid);
tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%"PRIx64" suid:%"PRId64,