提交 d7a48171 编写于 作者: wmmhello's avatar wmmhello

feat:add parameters for consumer & add offset rows for subscription

上级 ac5862c3
...@@ -433,8 +433,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { ...@@ -433,8 +433,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
if(pConsumerEp){
pConsumerEp->offsetRows = data->offsetRows; pConsumerEp->offsetRows = data->offsetRows;
data->offsetRows = NULL; data->offsetRows = NULL;
}
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册