diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index a3b1c4f7791879361630566cc2aead282b0e63a2..59c931d9aa8b30bce9933c322db2bf50c8611093 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "consumer_group"); + tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 369c7fabd904300ad4b53597f49352800a625bdb..01d89bc718c53d66441dd46318a794f8da4e0553 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -100,8 +100,8 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { return 0; } - mInfo("receive consumer lost msg, consumer id %" PRId64 ", status %s", pLostMsg->consumerId, - mndConsumerStatusName(pConsumer->status)); + mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, + pConsumer->status, mndConsumerStatusName(pConsumer->status)); if (pConsumer->status != MQ_CONSUMER_STATUS__READY) { mndReleaseConsumer(pMnode, pConsumer); @@ -114,9 +114,17 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { mndReleaseConsumer(pMnode, pConsumer); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm"); - if (pTrans == NULL) goto FAIL; - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; - if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; + if (pTrans == NULL) { + goto FAIL; + } + + if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { + goto FAIL; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + goto FAIL; + } tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); @@ -135,8 +143,8 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); ASSERT(pConsumer); - mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId, - mndConsumerStatusName(pConsumer->status)); + mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, + pConsumer->status, mndConsumerStatusName(pConsumer->status)); if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { mndReleaseConsumer(pMnode, pConsumer); @@ -150,7 +158,10 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { mndReleaseConsumer(pMnode, pConsumer); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); - if (pTrans == NULL) goto FAIL; + if (pTrans == NULL) { + goto FAIL; + } + if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; @@ -244,21 +255,24 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - if (status == MQ_CONSUMER_STATUS__READY && hbStatus > MND_CONSUMER_LOST_HB_CNT) { - SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); - - pLostMsg->consumerId = pConsumer->consumerId; - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_CONSUMER_LOST, - .pCont = pLostMsg, - .contLen = sizeof(SMqConsumerLostMsg), - }; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - } + + mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64, + pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime); if (status == MQ_CONSUMER_STATUS__READY) { - // do nothing + if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { + SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); + + pLostMsg->consumerId = pConsumer->consumerId; + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_TMQ_CONSUMER_LOST, + .pCont = pLostMsg, + .contLen = sizeof(SMqConsumerLostMsg), + }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) { + // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); @@ -379,11 +393,18 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { + mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup); terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; return -1; } - ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0); + int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)); + if (ret != 0) { + mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup, + pConsumer->cgroup); + terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; + return -1; + } atomic_store_32(&pConsumer->hbStatus, 0); @@ -392,7 +413,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { #if 1 if (status == MQ_CONSUMER_STATUS__LOST_REBD) { - mInfo("try to recover consumer:0x%"PRIx64 "", consumerId); + mInfo("try to recover consumer:0x%"PRIx64, consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); pRecoverMsg->consumerId = consumerId; @@ -401,6 +422,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { .pCont = pRecoverMsg, .contLen = sizeof(SMqConsumerRecoverMsg), }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } #endif @@ -416,7 +438,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 2. check epoch, only send ep info when epochs do not match if (epoch != serverEpoch) { taosRLockLatch(&pConsumer->lock); - mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch); + mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch, serverEpoch); int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics); rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp)); @@ -426,7 +448,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { goto FAIL; } - // handle all topic subscribed by the consumer + // handle all topics subscribed by this consumer for (int32_t i = 0; i < numOfTopics; i++) { char *topic = taosArrayGetP(pConsumer->currentTopics, i); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); @@ -455,6 +477,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); + // this customer assigned vgroups topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp)); if (topicEp.vgs == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -484,6 +507,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { } taosRUnLockLatch(&pConsumer->lock); } + // encode rsp int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp); void *buf = rpcMallocCont(tlen); @@ -491,6 +515,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; ((SMqRspHead *)buf)->epoch = serverEpoch; ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId; @@ -506,6 +531,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { pMsg->info.rsp = buf; pMsg->info.rspLen = tlen; return 0; + FAIL: tDeleteSMqAskEpRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); @@ -547,7 +573,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t newTopicNum = taosArrayGetSize(newSub); - // check topic existance + // check topic existence STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; @@ -718,13 +744,14 @@ CM_ENCODE_OVER: } SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { - terrno = TSDB_CODE_OUT_OF_MEMORY; SSdbRow *pRow = NULL; SMqConsumerObj *pConsumer = NULL; void *buf = NULL; int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) { + goto CM_DECODE_OVER; + } if (sver != MND_CONSUMER_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; @@ -732,52 +759,62 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { } pRow = sdbAllocRow(sizeof(SMqConsumerObj)); - if (pRow == NULL) goto CM_DECODE_OVER; + if (pRow == NULL) { + goto CM_DECODE_OVER; + } pConsumer = sdbGetRowObj(pRow); - if (pConsumer == NULL) goto CM_DECODE_OVER; + if (pConsumer == NULL) { + goto CM_DECODE_OVER; + } int32_t dataPos = 0; int32_t len; SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER); buf = taosMemoryMalloc(len); - if (buf == NULL) goto CM_DECODE_OVER; + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto CM_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER); if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; // TODO set correct error code goto CM_DECODE_OVER; } - tmsgUpdateDnodeEpSet(&pConsumer->ep); - terrno = TSDB_CODE_SUCCESS; + tmsgUpdateDnodeEpSet(&pConsumer->ep); CM_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { - mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer == NULL ? 0 : pConsumer->consumerId, - pRaw, terrstr()); + mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s", + pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr()); taosMemoryFreeClear(pRow); - return NULL; } return pRow; } static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId); + mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action", + pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status), + pConsumer->epoch); pConsumer->subscribeTime = pConsumer->upTime; return 0; } static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mTrace("consumer:%" PRId64 ", perform delete action", pConsumer->consumerId); + mDebug("consumer:0x%" PRIx64 " perform delete action, status:%s", pConsumer->consumerId, mndConsumerStatusName(pConsumer->status)); tDeleteSMqConsumerObj(pConsumer); return 0; } static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { - mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId); + mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%"PRId64", uptime:%"PRId64, + pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime); taosWLockLatch(&pOldConsumer->lock); @@ -817,7 +854,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->rebalanceTime = pNewConsumer->upTime; + int32_t status = pOldConsumer->status; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST; + mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d", + pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status), + pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0); ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); @@ -844,7 +885,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // not exist in current topic bool existing = false; #if 1 - for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) { + int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics); + for (int32_t i = 0; i < numOfExistedTopics; i++) { char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); if (strcmp(topic, addedTopic) == 0) { existing = true; @@ -869,27 +911,28 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } // set status + int32_t status = pOldConsumer->status; if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { - if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || - pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { + if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__READY; - } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB || - pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) { + } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) { pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; } } else { - if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || - pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { + if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB; - } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST || - pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) { + } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB; } } + // the re-balance is triggered when the new consumer is launched. pOldConsumer->rebalanceTime = pNewConsumer->upTime; atomic_add_fetch_32(&pOldConsumer->epoch, 1); + mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d", + pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status), + pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics)); } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) { ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0); ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1); @@ -928,27 +971,27 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ASSERT(i < sz); // set status + int32_t status = pOldConsumer->status; if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { - if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || - pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { + if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__READY; - } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB || - pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) { + } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) { pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; } } else { - if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || - pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { + if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB; - } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST || - pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) { + } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB; } } pOldConsumer->rebalanceTime = pNewConsumer->upTime; - atomic_add_fetch_32(&pOldConsumer->epoch, 1); + + mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d", + pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status), + pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics)); } taosWUnLockLatch(&pOldConsumer->lock); @@ -1036,8 +1079,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * if (hasTopic) { char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i)); - tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN); - varDataSetLen(topic, strlen(varDataVal(topic))); + STR_TO_VARSTR(topic, topicName); colDataSetVal(pColInfo, numOfRows, (const char *)topic, false); } else { colDataSetVal(pColInfo, numOfRows, NULL, true); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 220e3051115b26f7a8cee6eb3f8407693aa7f172..b843534fcd7d782eac154842df51e80957265abc 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -561,17 +561,17 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { goto _OVER; } - mInfo("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql); + mInfo("topic:%s start to create, sql:%s", createTopicReq.name, createTopicReq.sql); if (mndCheckCreateTopicReq(&createTopicReq) != 0) { - mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); + mError("topic:%s failed to create since %s", createTopicReq.name, terrstr()); goto _OVER; } pTopic = mndAcquireTopic(pMnode, createTopicReq.name); if (pTopic != NULL) { if (createTopicReq.igExists) { - mInfo("topic:%s, already exist, ignore exist is set", createTopicReq.name); + mInfo("topic:%s already exist, ignore exist is set", createTopicReq.name); code = 0; goto _OVER; } else { @@ -731,8 +731,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (pTopic->ntbUid != 0) { // broadcast to all vnode - void *pIter = NULL; + pIter = NULL; SVgObj *pVgroup = NULL; + while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 04c2ed3f382612457d2796fc0ec2773ebe31bfa9..841dc29731b38a2e4d63e5c27aa351c31fe14b18 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -746,7 +746,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { void* pIter = NULL; while (1) { pIter = taosHashIterate(pTq->pHandle, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8f8b32ff07f83f92475d72712dfe7b015a72b6a7..19c9c6f97d2722058dbc90d8973eedee7bb51700 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -279,11 +279,15 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) { SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + int32_t numOfUids = taosArrayGetSize(tableIdList); + if (numOfUids == 0) { + return qa; + } // let's discard the tables those are not created according to the queried super table. SMetaReader mr = {0}; metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); - for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { + for (int32_t i = 0; i < numOfUids; ++i) { uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i); int32_t code = metaGetTableEntryByUid(&mr, *id);