diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 412b4b4cf6c950113a2f7e6d319692695cde0c07..cfd5bd1ed77bc1a0c4f158f00cb8c2462ac70443 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -213,6 +213,7 @@ int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo); +bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); #ifdef __cplusplus } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 647da78a78895c21e6dd10da278d95b92bf85051..60efefb1e4505106c267b63291927d1c4736e430 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -136,6 +136,7 @@ typedef struct { SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; int8_t recoverStep; + int8_t recoverScanFinished; SQueryTableDataCond tableCond; int64_t fillHistoryVer1; int64_t fillHistoryVer2; @@ -182,7 +183,7 @@ struct SExecTaskInfo { SSubplan* pSubplan; struct SOperatorInfo* pRoot; SLocalFetch localFetch; - SArray* pResultBlockList;// result block list + SArray* pResultBlockList; // result block list STaskStopInfo stopInfo; }; @@ -199,7 +200,7 @@ typedef struct SOperatorFpSet { __optr_fn_t getNextFn; __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_close_fn_t closeFn; - __optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator + __optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; __optr_explain_fn_t getExplainFn; @@ -255,22 +256,22 @@ typedef struct SLimitInfo { } SLimitInfo; typedef struct SExchangeInfo { - SArray* pSources; - SArray* pSourceDataInfo; - tsem_t ready; - void* pTransporter; + SArray* pSources; + SArray* pSourceDataInfo; + tsem_t ready; + void* pTransporter; // SArray, result block list, used to keep the multi-block that // passed by downstream operator - SArray* pResultBlockList; - SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy. - SSDataBlock* pDummyBlock; // dummy block, not keep data - bool seqLoadData; // sequential load data or not, false by default - int32_t current; + SArray* pResultBlockList; + SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy. + SSDataBlock* pDummyBlock; // dummy block, not keep data + bool seqLoadData; // sequential load data or not, false by default + int32_t current; SLoadRemoteDataInfo loadInfo; uint64_t self; SLimitInfo limitInfo; - int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo + int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo } SExchangeInfo; typedef struct SScanInfo { @@ -305,9 +306,9 @@ typedef struct { } SAggOptrPushDownInfo; typedef struct STableMetaCacheInfo { - SLRUCache* pTableMetaEntryCache; // 100 by default - uint64_t metaFetch; - uint64_t cacheHit; + SLRUCache* pTableMetaEntryCache; // 100 by default + uint64_t metaFetch; + uint64_t cacheHit; } STableMetaCacheInfo; typedef struct STableScanBase { @@ -325,16 +326,16 @@ typedef struct STableScanBase { } STableScanBase; typedef struct STableScanInfo { - STableScanBase base; - SScanInfo scanInfo; - int32_t scanTimes; - SSDataBlock* pResBlock; - SSampleExecInfo sample; // sample execution info - int32_t currentGroupId; - int32_t currentTable; - int8_t scanMode; - int8_t assignBlockUid; - bool hasGroupByTag; + STableScanBase base; + SScanInfo scanInfo; + int32_t scanTimes; + SSDataBlock* pResBlock; + SSampleExecInfo sample; // sample execution info + int32_t currentGroupId; + int32_t currentTable; + int8_t scanMode; + int8_t assignBlockUid; + bool hasGroupByTag; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -360,11 +361,11 @@ typedef struct STableMergeScanInfo { } STableMergeScanInfo; typedef struct STagScanInfo { - SColumnInfo* pCols; - SSDataBlock* pRes; - SColMatchInfo matchInfo; - int32_t curPos; - SReadHandle readHandle; + SColumnInfo* pCols; + SSDataBlock* pRes; + SColMatchInfo matchInfo; + int32_t curPos; + SReadHandle readHandle; } STagScanInfo; typedef enum EStreamScanMode { @@ -468,6 +469,11 @@ typedef struct SStreamScanInfo { SNodeList* pGroupTags; SNode* pTagCond; SNode* pTagIndexCond; + + // recover + int32_t blockRecoverContiCnt; + int32_t blockRecoverTotCnt; + } SStreamScanInfo; typedef struct { @@ -499,8 +505,8 @@ typedef struct STableCountScanOperatorInfo { STableCountScanSupp supp; - int32_t currGrpIdx; - SArray* stbUidList; // when group by db_name and/or stable_name + int32_t currGrpIdx; + SArray* stbUidList; // when group by db_name and/or stable_name } STableCountScanOperatorInfo; typedef struct SOptrBasicInfo { @@ -678,19 +684,19 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32 void destroyOperatorInfo(SOperatorInfo* pOperator); int32_t optrDefaultBufFn(SOperatorInfo* pOperator); -void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); -void cleanupBasicInfo(SOptrBasicInfo* pInfo); +void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); +void cleanupBasicInfo(SOptrBasicInfo* pInfo); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); -void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); +void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey); void cleanupAggSup(SAggSupporter* pAggSup); -void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); +void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); @@ -803,10 +809,10 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t checkForQueryBuf(size_t numOfTables); -bool isTaskKilled(SExecTaskInfo* pTaskInfo); -void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); -void doDestroyTask(SExecTaskInfo* pTaskInfo); -void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); +bool isTaskKilled(SExecTaskInfo* pTaskInfo); +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); +void doDestroyTask(SExecTaskInfo* pTaskInfo); +void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, char* sql, EOPTR_EXEC_MODEL model); @@ -828,8 +834,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName); -uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); -void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock); +uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); +void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock); int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 75de0129470963c34edb9922f5d8e3c6cd9fc40d..f83f938e61c18875fc4998304cfd4bd5d8d78c0b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -937,6 +937,11 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { return 0; } +bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return pTaskInfo->streamInfo.recoverScanFinished; +} + void* qExtractReaderFromStreamScanner(void* scanner) { SStreamScanInfo* pInfo = scanner; return (void*)pInfo->tqReader; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1414d3b4ab310b427491f51f982c60f4ddb21107..175e52565f919cf3e5c2dc104e04b6392184a2ba 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1785,11 +1785,18 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN; + pTaskInfo->streamInfo.recoverScanFinished = false; } if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) { + if (pInfo->blockRecoverContiCnt > 100) { + pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt; + pInfo->blockRecoverContiCnt = 0; + return NULL; + } SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp); if (pBlock != NULL) { + pInfo->blockRecoverContiCnt++; calBlockTbName(pInfo, pBlock); if (pInfo->pUpdateInfo) { TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); @@ -1807,6 +1814,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->base.cond.startVersion = -1; pTSInfo->base.cond.endVersion = -1; + pTaskInfo->streamInfo.recoverScanFinished = true; return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b5bceb9b94fb8e819e0774a477286fd8b14e4a24..415fa41e56f84ef0b25865ebd7ce7a44dbe92727 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -112,7 +112,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ASSERT(0); } if (output == NULL) { - finished = true; + if (qStreamRecoverScanFinished(exec)) { + finished = true; + } else { + qSetStreamOpOpen(exec); + } break; }