diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 3f310ee9c09753d143e5c44e33506651c2765881..10e520d9ec49a53e5fcedcf668a40732480aa75b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -356,31 +356,44 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (not enough)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } + ASSERT(pIter == NULL); // 7. handle unassigned vg if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) { // if has consumer, assign all left vg while (1) { + SMqConsumerEp *pConsumerEp = NULL; pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) break; - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - ASSERT(pIter); + if (pRemovedIter == NULL) { + if (pIter != NULL) { + taosHashCancelIterate(pOutput->pSub->consumerHash, pIter); + pIter = NULL; + } + break; + } + while (1) { + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + ASSERT(pIter); + pConsumerEp = (SMqConsumerEp *)pIter; + ASSERT(pConsumerEp->consumerId > 0); + if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { + break; + } + } pRebVg = (SMqRebOutputVg *)pRemovedIter; - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - ASSERT(pConsumerEp->consumerId > 0); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { - mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 " (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); continue; } taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } else { @@ -571,7 +584,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); /*ASSERT(pTopic);*/ if (pTopic == NULL) { - mError("rebalance %s failed since topic %s was dropped, abort", pRebInfo->key, topic); + mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic); continue; } taosRLockLatch(&pTopic->lock); @@ -601,7 +614,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { // TODO replace assert with error check if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { - mError("persist rebalance output error, possibly vnode splitted or dropped"); + mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped"); } taosArrayDestroy(pRebInfo->lostConsumers); taosArrayDestroy(pRebInfo->newConsumers); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 02089d9fecbde6074c574af601c0104751839357..8c4fbe79710a65941c49f87422a0501bb5af6c4f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1175,19 +1175,19 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; + bool tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; bool isClosed = false; STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; - if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) { + if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) { win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); isClosed = isCloseWindow(&win, &pInfo->twAggSup); } - bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); - bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) && + bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup); if ((update || closedWin) && out) { appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);