提交 ac5862c3 编写于 作者: wmmhello's avatar wmmhello

feat:add parameters for consumer & add offset rows for subscription

上级 3f46a9bb
...@@ -515,6 +515,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { ...@@ -515,6 +515,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
SMqConsumerEp newEp = { SMqConsumerEp newEp = {
.consumerId = pConsumerEp->consumerId, .consumerId = pConsumerEp->consumerId,
.vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp), .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
.offsetRows = NULL,
}; };
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
} }
......
...@@ -255,7 +255,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI ...@@ -255,7 +255,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI
for (int32_t i = 0; i < numOfNewConsumers; i++) { for (int32_t i = 0; i < numOfNewConsumers; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i); int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
SMqConsumerEp newConsumerEp; SMqConsumerEp newConsumerEp = {0};
newConsumerEp.consumerId = consumerId; newConsumerEp.consumerId = consumerId;
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册