From 7df1727a7b4bb701d32e4f7a8323c88eb0ffa8ee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 5 Jun 2022 15:24:57 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 26 +++---- source/libs/executor/src/executorimpl.c | 6 +- source/libs/executor/src/scanoperator.c | 97 +++++++++++++++---------- 3 files changed, 76 insertions(+), 53 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 061bdc7236..c6220b5ffe 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -303,13 +303,17 @@ typedef struct SScanInfo { int32_t numOfDesc; } SScanInfo; +typedef struct SSampleExecInfo { + double sampleRatio; // data block sample ratio, 1 by default + uint32_t seed; // random seed value +} SSampleExecInfo; + typedef struct STableScanInfo { void* dataReader; SReadHandle readHandle; SFileBlockLoadRecorder readRecorder; int64_t numOfRows; - int64_t elapsedTime; // int32_t prevGroupId; // previous table group id SScanInfo scanInfo; int32_t scanTimes; @@ -330,9 +334,8 @@ typedef struct STableScanInfo { SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - double sampleRatio; // data block sample ratio, 1 by default SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. - + SSampleExecInfo sample; // sample execution info int32_t curTWinIdx; } STableScanInfo; @@ -692,7 +695,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); -int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, +int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); @@ -789,8 +792,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); -void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); - bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); @@ -825,16 +826,14 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); -STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, - SInterval* pInterval, int32_t precision, STimeWindow* win); -int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, - int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, - int32_t order); +STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, + int32_t precision, STimeWindow* win); +int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, + __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); -SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, - int32_t* pIndex); +SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex); int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); @@ -842,6 +841,7 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor); bool isSmaStream(int8_t triggerType); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6434e3a016..6a464de689 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2524,7 +2524,7 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray } } -int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, +int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) { if (pColList == NULL) { // data from other sources @@ -2679,7 +2679,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; - code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, + code = setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (code != 0) { taosMemoryFreeClear(pDataInfo->pRsp); @@ -2803,7 +2803,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { SSDataBlock* pRes = pExchangeInfo->pResult; SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; int32_t code = - setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, + setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (pRsp->completed == 1) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8541a0b129..ae6924eaff 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -40,6 +40,22 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); +static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock); +static bool processBlockWithProbability(const SSampleExecInfo *pInfo); + +bool processBlockWithProbability(const SSampleExecInfo *pInfo) { +#if 0 + if (pInfo->sampleRatio == 1) { + return true; + } + + uint32_t val = taosRandR((uint32_t*) &pInfo->seed); + return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0; +#else + return true; +#endif +} + static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { SWITCH_ORDER(pCtx[i].order); @@ -153,8 +169,6 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } -static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock); - static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -274,12 +288,9 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i]; TSWAP(pTWindow->skey, pTWindow->ekey); } - SQueryTableDataCond *pCond = &pTableScanInfo->cond; - taosqsort(pCond->twindows, - pCond->numOfTWindows, - sizeof(STimeWindow), - pCond, - compareTimeWindow); + + SQueryTableDataCond* pCond = &pTableScanInfo->cond; + taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow); } void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock) { @@ -310,18 +321,19 @@ void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_ } else { // these are tags STagVal tagVal = {0}; tagVal.cid = pExpr->base.pParam[0].pCol->colId; - const char *p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal); + const char* p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal); - char *data = NULL; - if(pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL){ - data = tTagValToData((const STagVal *)p, false); - }else { + char* data = NULL; + if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { + data = tTagValToData((const STagVal*)p, false); + } else { data = (char*)p; } for (int32_t i = 0; i < pBlock->info.rows; ++i) { colDataAppend(pColInfoData, i, data, (data == NULL)); } + if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { taosMemoryFree(data); @@ -359,6 +371,12 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); uint32_t status = 0; @@ -433,6 +451,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STimeWindow* pWin = &pTableScanInfo->cond.twindows[i]; qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); } + while (pTableScanInfo->scanTimes < total) { while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) { SSDataBlock* p = doTableScanImpl(pOperator); @@ -502,11 +521,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; @@ -517,7 +532,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - return NULL; + goto _error; } if (pTableScanNode->scan.pScanPseudoCols != NULL) { @@ -528,32 +543,40 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose - pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); - pInfo->sampleRatio = pTableScanNode->ratio; + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->sample.sampleRatio= pTableScanNode->ratio; + pInfo->sample.seed = taosGetTimestampMs(); + pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; - pInfo->pResBlock = createResDataBlock(pDescNode); - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; - pInfo->dataReader = pDataReader; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColList; - pInfo->curTWinIdx = 0; - - pOperator->name = "TableScanOperator"; // for debug purpose + pInfo->pResBlock = createResDataBlock(pDescNode); + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReader = pDataReader; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; + pInfo->curTWinIdx = 0; + + pOperator->name = "TableScanOperator"; // for debug purpose pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo); // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; - return pOperator; + + _error: + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; } SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) { @@ -1351,7 +1374,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { } SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; - setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, + setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols); // todo log the filter info -- GitLab