diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5c27aed625f44242ac4ac97270a445df104ac2d7..fce798d92901d6ed6b448401775b1501bab6d60b 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -253,6 +253,8 @@ typedef struct SQueryRuntimeEnv { char* tagVal; // tag value of current data block SArithmeticSupport *sasArray; + + struct STableScanInfo* pi; } SQueryRuntimeEnv; enum { @@ -312,6 +314,7 @@ typedef struct SSDataBlock { } SSDataBlock; typedef struct STableScanInfo { + SQInfo* pQInfo; void *pQueryHandle; int32_t numOfBlocks; int32_t numOfSkipped; @@ -321,6 +324,9 @@ typedef struct STableScanInfo { int32_t order; // scan order int32_t times; // repeat counts + int32_t current; + + int32_t reverseTimes; // 0 by default SSDataBlock block; int64_t elapsedTime; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3f22a132e88875c2ed304ed147c00d74ed719fa4..df787626a4a78f8b3ce1e86b7579649c4c1165c7 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -172,6 +172,10 @@ static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArr static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); static STableIdInfo createTableIdInfo(SQuery* pQuery); +static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime, int32_t reverseTime); +static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime); +static int32_t getNumOfScanTimes(SQuery* pQuery); + bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; @@ -3461,6 +3465,45 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI } } +static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) { + SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQuery *pQuery = pRuntimeEnv->pQuery; + + if (pRuntimeEnv->pTsBuf) { + SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); + bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); + assert(ret); + } + + // reverse order time range + SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); + SWITCH_ORDER(pQuery->order.order); + + if (QUERY_IS_ASC_QUERY(pQuery)) { + assert(pQuery->window.skey <= pQuery->window.ekey); + } else { + assert(pQuery->window.skey >= pQuery->window.ekey); + } + + SET_REVERSE_SCAN_FLAG(pRuntimeEnv); + STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); + + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + switchCtxOrder(pRuntimeEnv); + disableFuncInReverseScan(pQInfo); + setupQueryRangeForReverseScan(pQInfo); + + // clean unused handle + if (pRuntimeEnv->pSecQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); + } + + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + if (pRuntimeEnv->pSecQueryHandle == NULL) { + longjmp(pRuntimeEnv->env, terrno); + } +} + static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* pTableQueryInfo = pQuery->current; @@ -4643,6 +4686,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); + if (needReverseScan(pQuery)) { + pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pQInfo, getNumOfScanTimes(pQuery), 1); + } else { + pRuntimeEnv->pi = createTableScanInfo(pRuntimeEnv->pQueryHandle, pQInfo, getNumOfScanTimes(pQuery)); + } + if (pTsBuf != NULL) { int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order); @@ -5586,59 +5635,128 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) { } static SSDataBlock* doTableScan(void* param) { - STableScanInfo* pTableScanInfo = (STableScanInfo*) param; + STableScanInfo * pTableScanInfo = (STableScanInfo *)param; + SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv; - SSDataBlock* pBlock = &pTableScanInfo->block; - while(tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { - pTableScanInfo->numOfBlocks += 1; + SSDataBlock *pBlock = &pTableScanInfo->block; + while (pTableScanInfo->current < pTableScanInfo->times) { + while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { + pTableScanInfo->numOfBlocks += 1; - // todo check for query cancel + // todo check for query cancel - tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); + tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); - SDataStatis *pStatis = pBlock->pBlockStatis; + SDataStatis *pStatis = pBlock->pBlockStatis; - // this function never returns error? - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis); - pTableScanInfo->numOfBlockStatis += 1; + // this function never returns error? + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis); + pTableScanInfo->numOfBlockStatis += 1; - if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); - pTableScanInfo->numOfRows += pBlock->info.rows; + if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); + pTableScanInfo->numOfRows += pBlock->info.rows; + } + + return pBlock; } - return pBlock; - } + if (++pTableScanInfo->current >= pTableScanInfo->times) { + return NULL; + } - STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); - if (pRuntimeEnv->pSecQueryHandle == NULL) { - longjmp(pRuntimeEnv->env, terrno); - } + tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle); + STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window); + pTableScanInfo->pQueryHandle = + tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, + pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef); + if (pTableScanInfo->pQueryHandle == NULL) { + longjmp(pRuntimeEnv->env, terrno); + } - pRuntimeEnv->resultRowInfo.curIndex = qstatus.windowIndex; - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - pRuntimeEnv->scanFlag = REPEAT_SCAN; + pRuntimeEnv->resultRowInfo.curIndex = 0; + setQueryStatus(pRuntimeEnv->pQuery, QUERY_NOT_COMPLETED); + pRuntimeEnv->scanFlag = REPEAT_SCAN; - if (pRuntimeEnv->pTsBuf) { - bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); - assert(ret); + if (pRuntimeEnv->pTsBuf) { + bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); + assert(ret); + } + + qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey); } - qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"PRId64"-%"PRId64, pQInfo, - cond.twindow.skey, cond.twindow.ekey); + if (pTableScanInfo->reverseTimes > 0) { + setEnvBeforeReverseScan_rv(pRuntimeEnv); + + tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle); + + STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window); + pTableScanInfo->pQueryHandle = + tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, + pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef); + + while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { + pTableScanInfo->numOfBlocks += 1; + + // todo check for query cancel + tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); + + SDataStatis *pStatis = pBlock->pBlockStatis; + + // this function never returns error? + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis); + pTableScanInfo->numOfBlockStatis += 1; + + if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); + pTableScanInfo->numOfRows += pBlock->info.rows; + } + + return pBlock; + } + + + qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey); + } return NULL; } -static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, int32_t repeatTime) { +static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime) { + assert(repeatTime > 0); + STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->apply = doTableScan; pInfo->times = repeatTime; + pInfo->reverseTimes = 0; + + pInfo->current = 0; + pInfo->pQInfo = pQInfo; return pInfo; } +static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime, int32_t reverseTime) { + assert(repeatTime > 0); + + STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->apply = doTableScan; + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + + pInfo->current = 0; + pInfo->pQInfo = pQInfo; + return pInfo; +} + +static UNUSED_FUNC int32_t getTableScanTime(STableScanInfo* pTableScanInfo) { + return pTableScanInfo->current; +} + // this is a blocking operator static SSDataBlock* doAggOperator(void* param) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; @@ -5662,12 +5780,19 @@ static SSDataBlock* doAggOperator(void* param) { pInfo->pRuntimeEnv->pCtx[i].pOutput = idata.pData; } + pQuery->pos = 0; + + int32_t countId = 0; while(1) { SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo); if (pBlock == NULL) { break; } + if (countId != getTableScanTime(pInfo->pTableScanInfo)) { + needRepeatScan(pInfo->pRuntimeEnv); + } + blockwiseApplyFunctions(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pInfo->pResultRowInfo, binarySearchForKey, pBlock->pDataBlock); } @@ -5678,6 +5803,18 @@ static SSDataBlock* doAggOperator(void* param) { return res; } +// todo set the attribute of query scan count +static int32_t getNumOfScanTimes(SQuery* pQuery) { + for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + int32_t functionId = pQuery->pExpr1[i].base.functionId; + if (functionId == TSDB_FUNC_STDDEV || functionId == TSDB_FUNC_PERCT) { + return 2; + } + } + + return 1; +} + static UNUSED_FUNC SAggOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); @@ -5705,20 +5842,15 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) return; } - STableScanInfo* pi = createTableScanInfo(pRuntimeEnv->pQueryHandle); - SAggOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pi); + SAggOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); SSDataBlock* pResBlock = pAggInfo->apply(pAggInfo); // scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey); - // since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously. + // since the numOfRows must be identical for all functions that are allowed to be executed simutaneously. // pQuery->rec.rows = getNumOfResult(pRuntimeEnv); pQuery->rec.rows = pResBlock->info.rows;//getNumOfResult(pRuntimeEnv); - doSecondaryArithmeticProcess(pQuery); - - if (isQueryKilled(pQInfo)) { - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } +// doSecondaryArithmeticProcess(pQuery); // TODO limit/offset refactor to be one operator skipResults(pRuntimeEnv);