提交 038e6fbe 编写于 作者: 5 54liuyao

fix(tmq): tmq assert

上级 a94f90a3
......@@ -903,7 +903,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
int32_t rows = 0;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
......@@ -1023,9 +1022,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pTaskInfo->code = terrno;
return NULL;
}
rows = pBlockInfo->rows;
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
......@@ -1033,14 +1029,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
break;
if (pBlockInfo->rows > 0) {
break;
}
}
// record the scan action.
pInfo->numOfExec++;
pOperator->resultInfo.totalRows += pBlockInfo->rows;
if (rows == 0) {
if (pBlockInfo->rows == 0) {
pOperator->status = OP_EXEC_DONE;
} else if (pInfo->pUpdateInfo) {
pInfo->tsArrayIndex = 0;
......@@ -1056,7 +1054,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
return (rows == 0) ? NULL : pInfo->pRes;
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
}
}
......
......@@ -1270,7 +1270,8 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
}
bool isCloseWindow(STimeWindow *pWin, STimeWindowAggSupp* pSup) {
return pWin->ekey < pSup->maxTs - pSup->waterMark;
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0);
return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark;
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
......@@ -2141,7 +2142,9 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray, int32_t groupId,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
int32_t size = taosArrayGetSize(pWinArray);
ASSERT(pInfo->pChildren);
if (!pInfo->pChildren) {
return;
}
for (int32_t i = 0; i < size; i++) {
STimeWindow* pParentWin = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL;
......@@ -2339,6 +2342,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, pBlock->info.window.ekey);
}
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
}
......@@ -2406,7 +2410,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pChildren = NULL;
if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo));
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void *));
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
if (pChildOp) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册