From 35ddd566915beba0438c994b7f79a75b0ab987e9 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 26 Jul 2022 09:55:21 +0800 Subject: [PATCH] feat(stream): refector some log --- source/common/src/tdatablock.c | 4 +- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/timewindowoperator.c | 48 +++++++++---------- tests/script/jenkins/basic.txt | 2 +- 4 files changed, 28 insertions(+), 27 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f0b5de9838..e516bddac1 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1763,9 +1763,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; int32_t len = 0; - len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|\n", flag, + len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId, - pDataBlock->info.uid); + pDataBlock->info.uid, pDataBlock->info.rows); if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0beb6f1784..ce782c7a8f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1000,6 +1000,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid); +void printDataBlock(SSDataBlock* pBlock, const char* flag); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1e001a29a0..d69ceaf8df 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2769,7 +2769,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; - qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2778,7 +2778,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pInfo->pPullDataRes->info.rows != 0) { // process the rest of the data ASSERT(IS_FINAL_OP(pInfo)); - printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->pPullDataRes; } @@ -2793,20 +2793,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } return NULL; } - printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; } else { if (!IS_FINAL_OP(pInfo)) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows != 0) { - printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; } } if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) { pInfo->returnUpdate = false; ASSERT(!IS_FINAL_OP(pInfo)); - printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); // process the rest of the data return pInfo->pUpdateRes; } @@ -2814,13 +2814,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { // if (pInfo->pPullDataRes->info.rows != 0) { // // process the rest of the data // ASSERT(IS_FINAL_OP(pInfo)); - // printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + // printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); // return pInfo->pPullDataRes; // } doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->pDelRes; } } @@ -2831,10 +2831,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { clearSpecialDataBlock(pInfo->pUpdateRes); removeDeleteResults(pUpdated, pInfo->pDelWins); pOperator->status = OP_RES_TO_RETURN; - qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); break; } - printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv"); + printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); maxTs = TMAX(maxTs, pBlock->info.window.ekey); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA || @@ -2934,20 +2934,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pInfo->pPullDataRes->info.rows != 0) { // process the rest of the data ASSERT(IS_FINAL_OP(pInfo)); - printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->pPullDataRes; } doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows != 0) { - printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; } if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) { pInfo->returnUpdate = false; ASSERT(!IS_FINAL_OP(pInfo)); - printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); // process the rest of the data return pInfo->pUpdateRes; } @@ -2955,7 +2955,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->pDelRes; } // ASSERT(false); @@ -3815,14 +3815,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "Final Session" : "Single Session"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pInfo->pDelRes; } doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } - printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "Final Session" : "Single Session"); + printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } @@ -3835,7 +3835,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "Final Session Recv" : "Single Session Recv"); + printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv"); if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); @@ -3912,11 +3912,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "Final Session" : "Single Session"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pInfo->pDelRes; } doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "Final Session" : "Single Session"); + printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } @@ -3955,21 +3955,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "Semi Session"); + printDataBlock(pBInfo->pRes, "sems session"); return pBInfo->pRes; } // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { pInfo->returnDelete = true; - printDataBlock(pInfo->pDelRes, "Semi Session"); + printDataBlock(pInfo->pDelRes, "sems session"); return pInfo->pDelRes; } if (pInfo->pUpdateRes->info.rows > 0) { // process the rest of the data pOperator->status = OP_OPENED; - printDataBlock(pInfo->pUpdateRes, "Semi Session"); + printDataBlock(pInfo->pUpdateRes, "sems session"); return pInfo->pUpdateRes; } // semi interval operator clear disk buffer @@ -4033,21 +4033,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "Semi Session"); + printDataBlock(pBInfo->pRes, "sems session"); return pBInfo->pRes; } // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { pInfo->returnDelete = true; - printDataBlock(pInfo->pDelRes, "Semi Session"); + printDataBlock(pInfo->pDelRes, "sems session"); return pInfo->pDelRes; } if (pInfo->pUpdateRes->info.rows > 0) { // process the rest of the data pOperator->status = OP_OPENED; - printDataBlock(pInfo->pUpdateRes, "Semi Session"); + printDataBlock(pInfo->pUpdateRes, "sems session"); return pInfo->pUpdateRes; } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index a606311f3c..0c19e4a2fe 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -224,7 +224,7 @@ # ---- stream ./test.sh -f tsim/stream/basic0.sim -#./test.sh -f tsim/stream/basic1.sim +./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/drop_stream.sim ./test.sh -f tsim/stream/distributeInterval0.sim -- GitLab