From 053786e6b44368e71988ac5830d0522690781bb5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Jul 2022 15:50:36 +0800 Subject: [PATCH] fix(stream): data exec --- source/dnode/mnode/impl/src/mndSubscribe.c | 17 ++- source/libs/stream/src/streamExec.c | 122 +++++++++++---------- source/libs/wal/src/walRead.c | 2 + 3 files changed, 78 insertions(+), 63 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index d67e4e8783..05e197150e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -298,7 +298,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId); + mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, + pConsumerEp->consumerId); } imbCnt++; } @@ -312,7 +313,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId); + mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, + pConsumerEp->consumerId); } } } @@ -354,7 +356,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); + mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + pConsumerEp->consumerId); } } @@ -371,8 +374,14 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ASSERT(pConsumerEp->consumerId > 0); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; + if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { + mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + pConsumerEp->consumerId); + continue; + } taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); + mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + pConsumerEp->consumerId); } } else { // if all consumer is removed, put all vg into unassigned diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f6eb9e32f2..b59a812678 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -143,76 +143,80 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { } static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { - int32_t cnt = 0; - void* data = NULL; while (1) { - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); - if (qItem == NULL) { - qDebug("stream exec over, queue empty"); - break; - } - if (data == NULL) { - data = qItem; - if (qItem->type == STREAM_INPUT__DATA_BLOCK) { - /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ - } - streamQueueProcessSuccess(pTask->inputQueue); - } else { - if (streamAppendQueueItem(data, qItem) < 0) { - streamQueueProcessFail(pTask->inputQueue); + int32_t cnt = 0; + void* data = NULL; + while (1) { + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + qDebug("stream exec over, queue empty"); break; - } else { - cnt++; - /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ + } + if (data == NULL) { + data = qItem; streamQueueProcessSuccess(pTask->inputQueue); - taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); - taosFreeQitem(qItem); + if (qItem->type == STREAM_INPUT__DATA_BLOCK) { + /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ + } else { + break; + } + } else { + if (streamAppendQueueItem(data, qItem) < 0) { + streamQueueProcessFail(pTask->inputQueue); + break; + } else { + cnt++; + /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ + streamQueueProcessSuccess(pTask->inputQueue); + taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); + taosFreeQitem(qItem); + } } } - } - if (pTask->taskStatus == TASK_STATUS__DROPPING) { - if (data) streamFreeQitem(data); - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return NULL; - } + if (pTask->taskStatus == TASK_STATUS__DROPPING) { + if (data) streamFreeQitem(data); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return NULL; + } - if (data == NULL) return pRes; + if (data == NULL) break; - if (pTask->execType == TASK_EXEC__NONE) { - ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, data); - return pRes; - } + if (pTask->execType == TASK_EXEC__NONE) { + ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); + streamTaskOutput(pTask, data); + return pRes; + } - qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); - streamTaskExecImpl(pTask, data, pRes); - qDebug("stream task %d exec end", pTask->taskId); + qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); + streamTaskExecImpl(pTask, data, pRes); + qDebug("stream task %d exec end", pTask->taskId); - if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - if (qRes == NULL) { - streamQueueProcessFail(pTask->inputQueue); - taosArrayDestroy(pRes); - return NULL; - } - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - if (streamTaskOutput(pTask, qRes) < 0) { - /*streamQueueProcessFail(pTask->inputQueue);*/ - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); - return NULL; - } - if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; - qRes->childId = pTask->selfChildId; - qRes->sourceVer = pSubmit->ver; + if (taosArrayGetSize(pRes) != 0) { + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + if (qRes == NULL) { + streamQueueProcessFail(pTask->inputQueue); + taosArrayDestroy(pRes); + return NULL; + } + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; + if (streamTaskOutput(pTask, qRes) < 0) { + /*streamQueueProcessFail(pTask->inputQueue);*/ + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(qRes); + return NULL; + } + if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; + qRes->childId = pTask->selfChildId; + qRes->sourceVer = pSubmit->ver; + } + /*streamQueueProcessSuccess(pTask->inputQueue);*/ + pRes = taosArrayInit(0, sizeof(SSDataBlock)); } - /*streamQueueProcessSuccess(pTask->inputQueue);*/ - pRes = taosArrayInit(0, sizeof(SSDataBlock)); - } - streamFreeQitem(data); + streamFreeQitem(data); + } return pRes; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 908523f2a6..c47964803a 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -483,6 +483,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { pRead->pHead->head.version, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + ASSERT(0); return -1; } @@ -491,6 +492,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + ASSERT(0); return -1; } pRead->curVersion++; -- GitLab