提交 053786e6 编写于 作者: L Liu Jicong

fix(stream): data exec

上级 158c6ae3
...@@ -298,7 +298,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -298,7 +298,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp, .pVgEp = pVgEp,
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId); mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
pConsumerEp->consumerId);
} }
imbCnt++; imbCnt++;
} }
...@@ -312,7 +313,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -312,7 +313,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp, .pVgEp = pVgEp,
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId); mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
pConsumerEp->consumerId);
} }
} }
} }
...@@ -354,7 +356,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -354,7 +356,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg); taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
} }
} }
...@@ -371,8 +374,14 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -371,8 +374,14 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
ASSERT(pConsumerEp->consumerId > 0); ASSERT(pConsumerEp->consumerId > 0);
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
continue;
}
taosArrayPush(pOutput->rebVgs, pRebVg); taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
} }
} else { } else {
// if all consumer is removed, put all vg into unassigned // if all consumer is removed, put all vg into unassigned
......
...@@ -143,76 +143,80 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { ...@@ -143,76 +143,80 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
} }
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
int32_t cnt = 0;
void* data = NULL;
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); int32_t cnt = 0;
if (qItem == NULL) { void* data = NULL;
qDebug("stream exec over, queue empty"); while (1) {
break; SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
} if (qItem == NULL) {
if (data == NULL) { qDebug("stream exec over, queue empty");
data = qItem;
if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
}
streamQueueProcessSuccess(pTask->inputQueue);
} else {
if (streamAppendQueueItem(data, qItem) < 0) {
streamQueueProcessFail(pTask->inputQueue);
break; break;
} else { }
cnt++; if (data == NULL) {
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ data = qItem;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
taosFreeQitem(qItem); /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
} else {
break;
}
} else {
if (streamAppendQueueItem(data, qItem) < 0) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
cnt++;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamQueueProcessSuccess(pTask->inputQueue);
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
taosFreeQitem(qItem);
}
} }
} }
} if (pTask->taskStatus == TASK_STATUS__DROPPING) {
if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (data) streamFreeQitem(data);
if (data) streamFreeQitem(data); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return NULL;
return NULL; }
}
if (data == NULL) return pRes; if (data == NULL) break;
if (pTask->execType == TASK_EXEC__NONE) { if (pTask->execType == TASK_EXEC__NONE) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, data); streamTaskOutput(pTask, data);
return pRes; return pRes;
} }
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
streamTaskExecImpl(pTask, data, pRes); streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId); qDebug("stream task %d exec end", pTask->taskId);
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) { if (qRes == NULL) {
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
return NULL; return NULL;
} }
qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes; qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) { if (streamTaskOutput(pTask, qRes) < 0) {
/*streamQueueProcessFail(pTask->inputQueue);*/ /*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes); taosFreeQitem(qRes);
return NULL; return NULL;
} }
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver; qRes->sourceVer = pSubmit->ver;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes = taosArrayInit(0, sizeof(SSDataBlock));
} }
/*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes = taosArrayInit(0, sizeof(SSDataBlock));
}
streamFreeQitem(data); streamFreeQitem(data);
}
return pRes; return pRes;
} }
......
...@@ -483,6 +483,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -483,6 +483,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
pRead->pHead->head.version, ver); pRead->pHead->head.version, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1; return -1;
} }
...@@ -491,6 +492,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -491,6 +492,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1; return -1;
} }
pRead->curVersion++; pRead->curVersion++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册