未验证 提交 e7c542bf 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #16155 from taosdata/feature/stream

enh(stream): reduce table scan
......@@ -356,31 +356,44 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
}
}
ASSERT(pIter == NULL);
// 7. handle unassigned vg
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
// if has consumer, assign all left vg
while (1) {
SMqConsumerEp *pConsumerEp = NULL;
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) break;
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
ASSERT(pIter);
if (pRemovedIter == NULL) {
if (pIter != NULL) {
taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
pIter = NULL;
}
break;
}
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
ASSERT(pIter);
pConsumerEp = (SMqConsumerEp *)pIter;
ASSERT(pConsumerEp->consumerId > 0);
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
break;
}
}
pRebVg = (SMqRebOutputVg *)pRemovedIter;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
ASSERT(pConsumerEp->consumerId > 0);
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 " (second scan)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
continue;
}
taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
}
} else {
......@@ -571,7 +584,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
/*ASSERT(pTopic);*/
if (pTopic == NULL) {
mError("rebalance %s failed since topic %s was dropped, abort", pRebInfo->key, topic);
mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic);
continue;
}
taosRLockLatch(&pTopic->lock);
......@@ -601,7 +614,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
// TODO replace assert with error check
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("persist rebalance output error, possibly vnode splitted or dropped");
mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
}
taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->newConsumers);
......
......@@ -1175,19 +1175,19 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
bool tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
bool isClosed = false;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
}
bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) &&
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup);
if ((update || closedWin) && out) {
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册