未验证 提交 f9d643dd 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19178 from taosdata/fix/set_scan_limit

enh: set scan limit
...@@ -213,6 +213,7 @@ int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver); ...@@ -213,6 +213,7 @@ int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -136,6 +136,7 @@ typedef struct { ...@@ -136,6 +136,7 @@ typedef struct {
SSchemaWrapper* schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
int8_t recoverStep; int8_t recoverStep;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
int64_t fillHistoryVer1; int64_t fillHistoryVer1;
int64_t fillHistoryVer2; int64_t fillHistoryVer2;
...@@ -182,7 +183,7 @@ struct SExecTaskInfo { ...@@ -182,7 +183,7 @@ struct SExecTaskInfo {
SSubplan* pSubplan; SSubplan* pSubplan;
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
SLocalFetch localFetch; SLocalFetch localFetch;
SArray* pResultBlockList;// result block list SArray* pResultBlockList; // result block list
STaskStopInfo stopInfo; STaskStopInfo stopInfo;
}; };
...@@ -199,7 +200,7 @@ typedef struct SOperatorFpSet { ...@@ -199,7 +200,7 @@ typedef struct SOperatorFpSet {
__optr_fn_t getNextFn; __optr_fn_t getNextFn;
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn; __optr_close_fn_t closeFn;
__optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator __optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator
__optr_encode_fn_t encodeResultRow; __optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow; __optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn; __optr_explain_fn_t getExplainFn;
...@@ -255,22 +256,22 @@ typedef struct SLimitInfo { ...@@ -255,22 +256,22 @@ typedef struct SLimitInfo {
} SLimitInfo; } SLimitInfo;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray* pSources; SArray* pSources;
SArray* pSourceDataInfo; SArray* pSourceDataInfo;
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator // passed by downstream operator
SArray* pResultBlockList; SArray* pResultBlockList;
SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy. SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default bool seqLoadData; // sequential load data or not, false by default
int32_t current; int32_t current;
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
uint64_t self; uint64_t self;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
} SExchangeInfo; } SExchangeInfo;
typedef struct SScanInfo { typedef struct SScanInfo {
...@@ -305,9 +306,9 @@ typedef struct { ...@@ -305,9 +306,9 @@ typedef struct {
} SAggOptrPushDownInfo; } SAggOptrPushDownInfo;
typedef struct STableMetaCacheInfo { typedef struct STableMetaCacheInfo {
SLRUCache* pTableMetaEntryCache; // 100 by default SLRUCache* pTableMetaEntryCache; // 100 by default
uint64_t metaFetch; uint64_t metaFetch;
uint64_t cacheHit; uint64_t cacheHit;
} STableMetaCacheInfo; } STableMetaCacheInfo;
typedef struct STableScanBase { typedef struct STableScanBase {
...@@ -325,46 +326,46 @@ typedef struct STableScanBase { ...@@ -325,46 +326,46 @@ typedef struct STableScanBase {
} STableScanBase; } STableScanBase;
typedef struct STableScanInfo { typedef struct STableScanInfo {
STableScanBase base; STableScanBase base;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t currentGroupId; int32_t currentGroupId;
int32_t currentTable; int32_t currentTable;
int8_t scanMode; int8_t scanMode;
int8_t assignBlockUid; int8_t assignBlockUid;
bool hasGroupByTag; bool hasGroupByTag;
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
int32_t tableStartIndex; int32_t tableStartIndex;
int32_t tableEndIndex; int32_t tableEndIndex;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond SArray* queryConds; // array of queryTableDataCond
STableScanBase base; STableScanBase base;
int32_t bufPageSize; int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock; SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
SArray* sortSourceParams; SArray* sortSourceParams;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo; SSortExecInfo sortExecInfo;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
SColumnInfo* pCols; SColumnInfo* pCols;
SSDataBlock* pRes; SSDataBlock* pRes;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
int32_t curPos; int32_t curPos;
SReadHandle readHandle; SReadHandle readHandle;
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -468,6 +469,11 @@ typedef struct SStreamScanInfo { ...@@ -468,6 +469,11 @@ typedef struct SStreamScanInfo {
SNodeList* pGroupTags; SNodeList* pGroupTags;
SNode* pTagCond; SNode* pTagCond;
SNode* pTagIndexCond; SNode* pTagIndexCond;
// recover
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt;
} SStreamScanInfo; } SStreamScanInfo;
typedef struct { typedef struct {
...@@ -499,8 +505,8 @@ typedef struct STableCountScanOperatorInfo { ...@@ -499,8 +505,8 @@ typedef struct STableCountScanOperatorInfo {
STableCountScanSupp supp; STableCountScanSupp supp;
int32_t currGrpIdx; int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and/or stable_name SArray* stbUidList; // when group by db_name and/or stable_name
} STableCountScanOperatorInfo; } STableCountScanOperatorInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
...@@ -678,19 +684,19 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32 ...@@ -678,19 +684,19 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32
void destroyOperatorInfo(SOperatorInfo* pOperator); void destroyOperatorInfo(SOperatorInfo* pOperator);
int32_t optrDefaultBufFn(SOperatorInfo* pOperator); int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo); void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup); void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey); const char* pkey);
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf); SDiskbasedBuf* pBuf);
...@@ -803,10 +809,10 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, ...@@ -803,10 +809,10 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
char* sql, EOPTR_EXEC_MODEL model); char* sql, EOPTR_EXEC_MODEL model);
...@@ -828,8 +834,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); ...@@ -828,8 +834,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName); uint64_t* pGp, void* pTbName);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock); void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
......
...@@ -936,6 +936,10 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { ...@@ -936,6 +936,10 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
} }
return 0; return 0;
} }
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished;
}
void* qExtractReaderFromStreamScanner(void* scanner) { void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner; SStreamScanInfo* pInfo = scanner;
......
...@@ -744,8 +744,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -744,8 +744,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
ASSERT(pInfo->base.dataReader == NULL); ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
...@@ -962,8 +962,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU ...@@ -962,8 +962,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader, int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
GET_TASKID(pTaskInfo)); (STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
...@@ -971,7 +971,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU ...@@ -971,7 +971,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
} }
if (tsdbNextDataBlock(pReader)) { if (tsdbNextDataBlock(pReader)) {
/*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL); /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
} }
...@@ -1200,7 +1200,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS ...@@ -1200,7 +1200,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
...@@ -1729,11 +1729,18 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1729,11 +1729,18 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->scanTimes = 0; pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1; pTSInfo->currentGroupId = -1;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
pTaskInfo->streamInfo.recoverScanFinished = false;
} }
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
if (pInfo->blockRecoverContiCnt > 100) {
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
pInfo->blockRecoverContiCnt = 0;
return NULL;
}
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp); SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
if (pBlock != NULL) { if (pBlock != NULL) {
pInfo->blockRecoverContiCnt++;
calBlockTbName(pInfo, pBlock); calBlockTbName(pInfo, pBlock);
if (pInfo->pUpdateInfo) { if (pInfo->pUpdateInfo) {
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
...@@ -1751,6 +1758,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1751,6 +1758,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->base.cond.startVersion = -1; pTSInfo->base.cond.startVersion = -1;
pTSInfo->base.cond.endVersion = -1; pTSInfo->base.cond.endVersion = -1;
pTaskInfo->streamInfo.recoverScanFinished = true;
return NULL; return NULL;
} }
...@@ -2262,7 +2270,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2262,7 +2270,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL); code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock,
&pTSInfo->base.dataReader, NULL);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
destroyTableScanOperatorInfo(pTableScanOp); destroyTableScanOperatorInfo(pTableScanOp);
...@@ -2333,7 +2342,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2333,7 +2342,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
return pOperator; return pOperator;
...@@ -2470,7 +2480,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -2470,7 +2480,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
return pOperator; return pOperator;
...@@ -2491,11 +2502,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2491,11 +2502,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle; SReadHandle* pHandle = &pInfo->base.readHandle;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); int32_t code =
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != 0) { if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
...@@ -2893,8 +2905,8 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* ...@@ -2893,8 +2905,8 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo*
SSDataBlock* pRes, char* dbName); SSDataBlock* pRes, char* dbName);
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName); STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName); STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes); STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
...@@ -3019,8 +3031,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC ...@@ -3019,8 +3031,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
return pOperator; return pOperator;
_error: _error:
......
...@@ -116,7 +116,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -116,7 +116,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
ASSERT(0); ASSERT(0);
} }
if (output == NULL) { if (output == NULL) {
finished = true; if (qStreamRecoverScanFinished(exec)) {
finished = true;
} else {
qSetStreamOpOpen(exec);
}
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册