diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8bdbb794c0e4946de65755e4f12bb67e2a10a499..31ef0caa877abf9f07129923418864fbcd1f92a6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1649,8 +1649,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { typedef struct { int64_t leftForVer; int32_t vgId; - int64_t oldConsumerId; - int64_t newConsumerId; + int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN]; char* sql; @@ -1659,55 +1658,30 @@ typedef struct { char* qmsg; } SMqSetCVgReq; -static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { - int32_t tlen = 0; - tlen += taosEncodeFixedU64(buf, pMsg->sId); - tlen += taosEncodeFixedU64(buf, pMsg->queryId); - tlen += taosEncodeFixedU64(buf, pMsg->taskId); - tlen += taosEncodeFixedU32(buf, pMsg->sqlLen); - tlen += taosEncodeFixedU32(buf, pMsg->phyLen); - //tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen); - return tlen; -} - -static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { - buf = taosDecodeFixedU64(buf, &pMsg->sId); - buf = taosDecodeFixedU64(buf, &pMsg->queryId); - buf = taosDecodeFixedU64(buf, &pMsg->taskId); - buf = taosDecodeFixedU32(buf, &pMsg->sqlLen); - buf = taosDecodeFixedU32(buf, &pMsg->phyLen); - //buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen); - return buf; -} - static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->leftForVer); tlen += taosEncodeFixedI32(buf, pReq->vgId); - tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); - tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); + tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->topicName); tlen += taosEncodeString(buf, pReq->cgroup); tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->qmsg); - //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->leftForVer); buf = taosDecodeFixedI32(buf, &pReq->vgId); - buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); - buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); + buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->topicName); buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->qmsg); - //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 9ff099fae0102838edce85cbe9e20059da20d7fd..4adfa9eaf9bc5fc229d741a628eb54abdb03bc4c 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -671,50 +671,57 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { } tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); - pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); - /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg); - if (pReq == NULL) { - ASSERT(false); - usleep(blocking_time * 1000); - return NULL; - } + int32_t beginVgIdx = pTopic->nextVgIdx; + while(1) { + pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); + /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg); + if (pReq == NULL) { + ASSERT(false); + usleep(blocking_time * 1000); + return NULL; + } - SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam)); - if (param == NULL) { - ASSERT(false); - usleep(blocking_time * 1000); - return NULL; - } - param->tmq = tmq; - param->retMsg = &tmq_message; - param->pVg = pVg; - tsem_init(¶m->rspSem, 0, 0); + SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam)); + if (param == NULL) { + ASSERT(false); + usleep(blocking_time * 1000); + return NULL; + } + param->tmq = tmq; + param->retMsg = &tmq_message; + param->pVg = pVg; + tsem_init(¶m->rspSem, 0, 0); - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); + pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->requestObjRefId = 0; - sendInfo->param = param; - sendInfo->fp = tmqPollCb; + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = param; + sendInfo->fp = tmqPollCb; - /*printf("req offset: %ld\n", pReq->offset);*/ + /*printf("req offset: %ld\n", pReq->offset);*/ - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - tmq->pollCnt++; + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + tmq->pollCnt++; - tsem_wait(¶m->rspSem); - tsem_destroy(¶m->rspSem); - free(param); + tsem_wait(¶m->rspSem); + tsem_destroy(¶m->rspSem); + free(param); - if (tmq_message == NULL) { - usleep(blocking_time * 1000); - } + if (tmq_message == NULL) { + if (beginVgIdx == pTopic->nextVgIdx) { + usleep(blocking_time * 1000); + } else { + continue; + } + } - return tmq_message; + return tmq_message; + } /*tsem_wait(&pRequest->body.rspSem);*/ diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 28e769a7ffba861157dcc9521fcd4628a7946727..2a3e0008a2e94496e6aee7b26f67d5a822668540 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -107,9 +107,9 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { SMqMVRebReq req = { - .vgId = pConsumerEp->vgId, - .oldConsumerId = pConsumerEp->oldConsumerId, - .newConsumerId = pConsumerEp->consumerId, + .vgId = pConsumerEp->vgId, + .oldConsumerId = pConsumerEp->oldConsumerId, + .newConsumerId = pConsumerEp->consumerId, }; int32_t tlen = tEncodeSMqMVRebReq(NULL, &req); @@ -133,7 +133,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume } static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { - ASSERT(pConsumerEp->oldConsumerId != -1); int32_t vgId = pConsumerEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); @@ -161,8 +160,7 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { SMqSetCVgReq req = {0}; - req.oldConsumerId = pConsumerEp->consumerId; - req.newConsumerId = -1; + req.consumerId = pConsumerEp->consumerId; int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); void *buf = malloc(sizeof(SMsgHead) + tlen); @@ -220,7 +218,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { } ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); - //TODO + // TODO int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus); mTrace("try to get sub ep, old val: %d", hbStatus); atomic_store_32(&pConsumer->hbStatus, 0); @@ -232,7 +230,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { rsp.consumerId = consumerId; rsp.epoch = pConsumer->epoch; if (epoch != rsp.epoch) { - mInfo("old epoch %d, new epoch %d", epoch, rsp.epoch); + mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch); SArray *pTopics = pConsumer->currentTopics; int sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); @@ -404,53 +402,57 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { // iterate all consumers, set unassignedVgStash for (int i = 0; i < consumerNum; i++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); - int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); - int vgThisConsumerAfterRb; - if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; - else vgThisConsumerAfterRb = vgEachConsumer; + int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); + int vgThisConsumerAfterRb; + if (i < imbalanceVg) + vgThisConsumerAfterRb = vgEachConsumer + 1; + else + vgThisConsumerAfterRb = vgEachConsumer; - mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb); + mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId, + vgThisConsumerBeforeRb, vgThisConsumerAfterRb); - while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) { + while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) { SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo); ASSERT(pConsumerEp != NULL); ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId); taosArrayPush(pSub->unassignedVg, pConsumerEp); } - SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); - int32_t status = atomic_load_32(&pRebConsumer->status); - if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || - (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || - (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST) - ) { - pRebConsumer->epoch++; - if (vgThisConsumerAfterRb != 0) { - atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); - } else { - atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); - } + SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); + int32_t status = atomic_load_32(&pRebConsumer->status); + if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || + (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || + (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) { + pRebConsumer->epoch++; + if (vgThisConsumerAfterRb != 0) { + atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); + } else { + atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); + } - mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status); + mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status); - SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); - sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); - mndTransAppendRedolog(pTrans, pConsumerRaw); - } - mndReleaseConsumer(pMnode, pRebConsumer); + SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); + sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); + mndTransAppendRedolog(pTrans, pConsumerRaw); + } + mndReleaseConsumer(pMnode, pRebConsumer); } - //assign to vgroup + // assign to vgroup if (taosArrayGetSize(pSub->unassignedVg) != 0) { for (int i = 0; i < consumerNum; i++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); - int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); - int vgThisConsumerAfterRb; - if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; - else vgThisConsumerAfterRb = vgEachConsumer; + int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); + int vgThisConsumerAfterRb; + if (i < imbalanceVg) + vgThisConsumerAfterRb = vgEachConsumer + 1; + else + vgThisConsumerAfterRb = vgEachConsumer; - while(taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) { - SMqConsumerEp* pConsumerEp = taosArrayPop(pSub->unassignedVg); + while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) { + SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg); ASSERT(pConsumerEp != NULL); pConsumerEp->oldConsumerId = pConsumerEp->consumerId; @@ -458,19 +460,21 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); if (pConsumerEp->oldConsumerId == -1) { - char* topic; - char* cgroup; + char *topic; + char *cgroup; mndSplitSubscribeKey(pSub->key, &topic, &cgroup); - SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic, pConsumerEp->consumerId); + mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic, + pConsumerEp->consumerId); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); mndReleaseTopic(pMnode, pTopic); free(topic); free(cgroup); } else { - mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); + mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId, + pConsumerEp->oldConsumerId, pConsumerEp->consumerId); mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); } @@ -488,10 +492,12 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { } if (mndTransPrepare(pMnode, pTrans) != 0) { mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + taosHashCleanup(pReq->rebSubHash); mndTransDrop(pTrans); return -1; } + taosHashCleanup(pReq->rebSubHash); mndTransDrop(pTrans); return 0; } @@ -738,15 +744,13 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp) { - ASSERT(pConsumerEp->oldConsumerId == -1); int32_t vgId = pConsumerEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SMqSetCVgReq req = { .vgId = vgId, - .oldConsumerId = -1, - .newConsumerId = pConsumerEp->consumerId, + .consumerId = pConsumerEp->consumerId, .sql = pTopic->sql, .logicalPlan = pTopic->logicalPlan, .physicalPlan = pTopic->physicalPlan, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index de4929b7dfc90ce121b4080850febca3c6d030ac..34d36fa18d1f7e3461ea42d47a31e91145f60c00 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -134,7 +134,6 @@ const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHan return NULL; } - int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; int64_t reqId = pReq->reqId; @@ -160,7 +159,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i); // TODO: support multiple topic in one req if (strcmp(pTopic->topicName, pReq->topic) != 0) { - ASSERT(false); + /*ASSERT(false);*/ continue; } @@ -174,7 +173,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { - pTopic->committedOffset = pReq->offset-1; + pTopic->committedOffset = pReq->offset - 1; } rsp.committedOffset = pTopic->committedOffset; @@ -235,7 +234,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { break; } } - //TODO copy + // TODO copy rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.rspOffset = fetchOffset; @@ -270,11 +269,11 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { void* abuf = buf; tEncodeSMqConsumeRsp(&abuf, &rsp); if (rsp.pBlockData) { - taosArrayDestroyEx(rsp.pBlockData, (void(*)(void*))tDeleteSSDataBlock); + taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); rsp.pBlockData = NULL; /*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/ - /*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/ - /*tDeleteSSDataBlock(pBlock);*/ + /*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/ + /*tDeleteSSDataBlock(pBlock);*/ /*}*/ /*taosArrayDestroy(rsp.pBlockData);*/ } @@ -301,23 +300,20 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) { int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { SMqSetCVgReq req = {0}; tDecodeSMqSetCVgReq(msg, &req); - ASSERT(req.oldConsumerId == -1); - - STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); + /*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/ + STqConsumerHandle* pConsumer = calloc(1, sizeof(STqConsumerHandle)); if (pConsumer == NULL) { - pConsumer = calloc(sizeof(STqConsumerHandle), 1); - if (pConsumer == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + return -1; } + strcpy(pConsumer->cgroup, req.cgroup); pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle)); - pConsumer->consumerId = req.newConsumerId; + pConsumer->consumerId = req.consumerId; pConsumer->epoch = 0; - STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); + STqTopicHandle* pTopic = calloc(1, sizeof(STqTopicHandle)); if (pTopic == NULL) { free(pConsumer); return -1; @@ -337,13 +333,13 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - SReadHandle handle = { .reader = pReadHandle, .meta = pTq->pMeta }; + SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); } taosArrayPush(pConsumer->topics, pTopic); - tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); - tqHandleCommit(pTq->tqMeta, req.newConsumerId); + tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); + tqHandleCommit(pTq->tqMeta, req.consumerId); terrno = TSDB_CODE_SUCCESS; return 0; } @@ -429,7 +425,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t numOfCols = pHandle->pSchema->numOfCols; int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); - //TODO: stable case + // TODO: stable case if (colNumNeed > pSchemaWrapper->nCols) { colNumNeed = pSchemaWrapper->nCols; } @@ -445,7 +441,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) { j++; } - SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; + SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; SColumnInfoData colInfo = {0}; int sz = numOfRows * pColSchema->bytes; colInfo.info.bytes = pColSchema->bytes;