提交 b1bd24d1 编写于 作者: L Liu Jicong

enh: scan exec set limit

上级 69d41059
......@@ -213,6 +213,7 @@ int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
#ifdef __cplusplus
}
......
......@@ -136,6 +136,7 @@ typedef struct {
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN];
int8_t recoverStep;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
......@@ -182,7 +183,7 @@ struct SExecTaskInfo {
SSubplan* pSubplan;
struct SOperatorInfo* pRoot;
SLocalFetch localFetch;
SArray* pResultBlockList;// result block list
SArray* pResultBlockList; // result block list
STaskStopInfo stopInfo;
};
......@@ -199,7 +200,7 @@ typedef struct SOperatorFpSet {
__optr_fn_t getNextFn;
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn;
__optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator
__optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn;
......@@ -255,22 +256,22 @@ typedef struct SLimitInfo {
} SLimitInfo;
typedef struct SExchangeInfo {
SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
SArray* pResultBlockList;
SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
SArray* pResultBlockList;
SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
SLoadRemoteDataInfo loadInfo;
uint64_t self;
SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
} SExchangeInfo;
typedef struct SScanInfo {
......@@ -305,9 +306,9 @@ typedef struct {
} SAggOptrPushDownInfo;
typedef struct STableMetaCacheInfo {
SLRUCache* pTableMetaEntryCache; // 100 by default
uint64_t metaFetch;
uint64_t cacheHit;
SLRUCache* pTableMetaEntryCache; // 100 by default
uint64_t metaFetch;
uint64_t cacheHit;
} STableMetaCacheInfo;
typedef struct STableScanBase {
......@@ -325,47 +326,47 @@ typedef struct STableScanBase {
} STableScanBase;
typedef struct STableScanInfo {
STableScanBase base;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t assignBlockUid;
bool hasGroupByTag;
STableScanBase base;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t assignBlockUid;
bool hasGroupByTag;
} STableScanInfo;
typedef struct STableMergeScanInfo {
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond
STableScanBase base;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond
STableScanBase base;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
SColumnInfo* pCols;
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
} STagScanInfo;
typedef enum EStreamScanMode {
......@@ -469,6 +470,11 @@ typedef struct SStreamScanInfo {
SNodeList* pGroupTags;
SNode* pTagCond;
SNode* pTagIndexCond;
// recover
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt;
} SStreamScanInfo;
typedef struct {
......@@ -500,8 +506,8 @@ typedef struct STableCountScanOperatorInfo {
STableCountScanSupp supp;
int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and/or stable_name
int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;
typedef struct SOptrBasicInfo {
......@@ -680,19 +686,19 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32
void destroyOperatorInfo(SOperatorInfo* pOperator);
int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
......@@ -805,10 +811,10 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
char* sql, EOPTR_EXEC_MODEL model);
......@@ -830,8 +836,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
......
......@@ -539,7 +539,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
taosArrayPush(pTaskInfo->pResultBlockList, &p1);
p = p1;
} else {
p = *(SSDataBlock**) taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
copyDataBlock(p, pRes);
}
......@@ -574,9 +574,9 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SArray* pList = pTaskInfo->pResultBlockList;
size_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
SArray* pList = pTaskInfo->pResultBlockList;
size_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
blockDataDestroy(*p);
}
......@@ -747,11 +747,11 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
}
int32_t nOptrWithVal = 0;
// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
// taosMemoryFreeClear(*pOutput);
// *len = 0;
// }
// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
// taosMemoryFreeClear(*pOutput);
// *len = 0;
// }
return 0;
}
......@@ -763,7 +763,7 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
}
return 0;
// return decodeOperator(pTaskInfo->pRoot, pInput, len);
// return decodeOperator(pTaskInfo->pRoot, pInput, len);
}
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
......@@ -937,6 +937,11 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
return 0;
}
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished;
}
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return (void*)pInfo->tqReader;
......
......@@ -1785,11 +1785,18 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
pTaskInfo->streamInfo.recoverScanFinished = false;
}
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
if (pInfo->blockRecoverContiCnt > 100) {
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
pInfo->blockRecoverContiCnt = 0;
return NULL;
}
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
if (pBlock != NULL) {
pInfo->blockRecoverContiCnt++;
calBlockTbName(pInfo, pBlock);
if (pInfo->pUpdateInfo) {
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
......@@ -1807,6 +1814,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->base.cond.startVersion = -1;
pTSInfo->base.cond.endVersion = -1;
pTaskInfo->streamInfo.recoverScanFinished = true;
return NULL;
}
......
......@@ -112,7 +112,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
ASSERT(0);
}
if (output == NULL) {
finished = true;
if (qStreamRecoverScanFinished(exec)) {
finished = true;
} else {
qSetStreamOpOpen(exec);
}
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册