diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a238f27e8836072f4cf3e906baf989b9b63e2689..1b26268978b0c43f5fbd78c4cbfb3e29729c5907 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2817,11 +2817,14 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { offset = pMsg - (char *)pMetaMsg; pElem->tableCond = htonl(offset); - - uint32_t len = strlen(pTagCond->tbnameCond.cond); + + uint32_t len = 0; + if (pTagCond->tbnameCond.cond != NULL) { + len = strlen(pTagCond->tbnameCond.cond); + memcpy(pMsg, pTagCond->tbnameCond.cond, len); + } + pElem->tableCondLen = htonl(len); - - memcpy(pMsg, pTagCond->tbnameCond.cond, len); pMsg += len; } diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index fe54233bc9a6bb2ac098a3b824050f08652a3fec..efad07ac24c2ece25ffe49167b09e24011b5081a 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -153,7 +153,6 @@ void doSkipResults(SQueryRuntimeEnv* pRuntimeEnv); void doFinalizeResult(SQueryRuntimeEnv* pRuntimeEnv); int64_t getNumOfResult(SQueryRuntimeEnv* pRuntimeEnv); -void forwardIntervalQueryRange(STableQuerySupportObj* pSupporter, SQueryRuntimeEnv* pRuntimeEnv); void forwardQueryStartPosition(SQueryRuntimeEnv* pRuntimeEnv); bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySupportObj* pSupporter, @@ -171,12 +170,10 @@ void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter); void copyFromWindowResToSData(SQInfo* pQInfo, SWindowResult* result); -SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType); -SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot); +SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv); +SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pBlock, int32_t type); -// void queryOnBlock(STableQuerySupportObj* pSupporter, int32_t blockStatus, SBlockInfo* pBlockBasicInfo, -// SMeterDataInfo* pDataHeadInfoEx, SField* pFields, -// __block_search_fn_t searchFn); +SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot); void stableApplyFunctionsOnBlock(STableQuerySupportObj* pSupporter, SMeterDataInfo* pMeterDataInfo, SBlockInfo* pBlockInfo, SField* pFields, __block_search_fn_t searchFn); @@ -190,14 +187,13 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo** pMeterDataInfo, int32_t numOfMet int32_t* nAllocBlocksInfoSize, int64_t addr); void freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len); -void setExecutionContext(STableQuerySupportObj* pSupporter, SWindowResult* outputRes, int32_t meterIdx, - int32_t groupIdx, SMeterQueryInfo* sqinfo); -int32_t setIntervalQueryExecutionContext(STableQuerySupportObj* pSupporter, int32_t meterIdx, SMeterQueryInfo* sqinfo); +void setExecutionContext(STableQuerySupportObj* pSupporter, SMeterQueryInfo* pMeterQueryInfo, int32_t meterIdx, + int32_t groupIdx, TSKEY nextKey); +int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, SMeterQueryInfo *pMeterQueryInfo); void doGetAlignedIntervalQueryRangeImpl(SQuery* pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast, int64_t* actualSkey, int64_t* actualEkey, int64_t* skey, int64_t* ekey); int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange); -int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus); int32_t getDataBlocksForMeters(STableQuerySupportObj* pSupporter, SQuery* pQuery, int32_t numOfMeters, const char* filePath, SMeterDataInfo** pMeterDataInfo, uint32_t* numOfBlocks); diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 8b435df5bf8991043c5b0125c12e49ecaa1b1b5c..4d28a9a3a974db55de58bb65aeb3a1f94647ad64 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -169,19 +169,16 @@ typedef struct SQueryRuntimeEnv { STSBuf* pTSBuf; STSCursor cur; SQueryCostSummary summary; - - STimeWindow intervalWindow; // the complete time window, not affected by the actual data distribution + bool stableQuery; // is super table query or not + SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file /* * Temporarily hold the in-memory cache block info during scan cache blocks - * Here we do not use the cacheblock info from pMeterObj, simple because it may change anytime - * during the query by the subumit/insert handling threads. + * Here we do not use the cache block info from pMeterObj, simple because it may change anytime + * during the query by the submit/insert handling threads. * So we keep a copy of the support structure as well as the cache block data itself. */ SCacheBlock cacheBlock; - - SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file - bool stableQuery; // is super table query or not } SQueryRuntimeEnv; /* intermediate pos during multimeter query involves interval */ @@ -222,13 +219,11 @@ typedef struct STableQuerySupportObj { * rows may be generated by a specific subgroup. When query on all subgroups is executed, * the result is copy to output buffer. This attribution is not used during single meter query processing. */ -// SWindowResult* pResult; SQueryRuntimeEnv runtimeEnv; int64_t rawSKey; int64_t rawEKey; int32_t subgroupIdx; int32_t offset; /* offset in group result set of subgroup */ - tSidSet* pSidSet; /* diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 9c110dbc7875becb0cc9d0c8adbf364536334079..9cfa313ca7f798d72d9596c51f1c1e15e8850dbf 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -70,11 +70,11 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); static int32_t flushFromResultBuf(STableQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv); -static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); -static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); -static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); -static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); -static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); +// static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); +static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); +static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); +static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); +static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); static int32_t getGroupResultId(int32_t groupIndex) { int32_t base = 200000; @@ -984,7 +984,6 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR return ret; } -// todo ignore the blockType, pass the pQuery into this function SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock, int32_t blockType) { SBlockInfo blockInfo = {0}; if (IS_FILE_BLOCK(blockType)) { @@ -1030,7 +1029,7 @@ static bool queryPausedInCurrentBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, in setQueryStatus(pQuery, QUERY_COMPLETED); return true; } - + // output buffer is full, pause current query if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pBlockInfo->size) || @@ -1038,11 +1037,11 @@ static bool queryPausedInCurrentBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, in return true; } - + if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)) { return true; } - + // query completed if ((pQuery->ekey <= pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->ekey >= pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) { @@ -1247,11 +1246,10 @@ static void *getGenericDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim } } -static SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; +SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; - void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); + void *pBlock = getGenericDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot); assert(pBlock != NULL); int32_t blockType = IS_DISK_DATA_BLOCK(pQuery) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; @@ -1444,14 +1442,13 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa return dataBlock; } -static bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { +static SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); - return (pWindowResInfo->pResult[slot].status.closed == true); + return &pWindowResInfo->pResult[slot]; } -static SWindowResult* getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { - assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); - return &pWindowResInfo->pResult[slot]; +static bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { + return (getWindowResult(pWindowResInfo, slot)->status.closed == true); } static int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) { @@ -1491,7 +1488,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin taosAddToHashTable(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); } - return &pWindowResInfo->pResult[pWindowResInfo->curIndex]; + return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex); } // get the correct time window according to the handled timestamp @@ -1503,7 +1500,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t w.ekey = w.skey + pQuery->intervalTime - 1; } else { int32_t slot = curTimeWindow(pWindowResInfo); - w = pWindowResInfo->pResult[slot].window; + w = getWindowResult(pWindowResInfo, slot)->window; } if (w.skey > ts || w.ekey < ts) { @@ -1522,19 +1519,16 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t w.ekey = w.skey + pQuery->intervalTime - 1; } - // query border check + /* + * query border check, skey should not be bounded by the query time range, since the value skey will + * be used as the time window index value. So we only change ekey of time window accordingly. + */ if (w.ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) { w.ekey = pQuery->ekey; } - if (w.skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)) { - w.skey = pQuery->ekey; - } assert(ts >= w.skey && ts <= w.ekey && w.skey != 0); - if (w.skey == 1542597000000) { - int32_t k = 1; - } - + return w; } @@ -1568,9 +1562,6 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR return -1; } - if (pageId == 153 && pData->numOfElems >= 138) { - int32_t k = 1; - } // set the number of rows in current disk page if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer pWindowRes->pos.pageId = pageId; @@ -1724,10 +1715,6 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, } } } - - if (pQuery->lastKey == 1542597000001) { - int32_t k = 1; - } assert(num >= 0); return num; @@ -1752,7 +1739,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat } } -static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin, int32_t offset) { +static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin, + int32_t offset) { SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; @@ -1854,9 +1842,8 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } } - TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey : pRuntimeEnv->intervalWindow.ekey; - setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull, - pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); + setExecParams(pQuery, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, + hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -1865,19 +1852,23 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t TSKEY ts = primaryKeyCol[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - if (win.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && IS_MASTER_SCAN(pRuntimeEnv)) { - int32_t k = 1; - } - - if (win.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && !IS_MASTER_SCAN(pRuntimeEnv)) { - int32_t k = 1; - } - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { return 0; } - - TSKEY ekey = QUERY_IS_ASC_QUERY(pQuery) ? win.ekey : win.skey; + + TSKEY ekey = -1; + if (QUERY_IS_ASC_QUERY(pQuery)) { + ekey = win.ekey; + if (ekey > pQuery->ekey) { + ekey = pQuery->ekey; + } + } else { + ekey = win.skey; + if (ekey < pQuery->ekey) { + ekey = pQuery->ekey; + } + } + forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); @@ -1892,18 +1883,26 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t if (startPos < 0) { break; } - - if (nextWin.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && IS_MASTER_SCAN(pRuntimeEnv)) { - int32_t k = 1; - } - + // null data, failed to allocate more memory buffer int32_t sid = pRuntimeEnv->pMeterObj->sid; if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) { break; } - ekey = QUERY_IS_ASC_QUERY(pQuery) ? nextWin.ekey : nextWin.skey; + ekey = -1; + if (QUERY_IS_ASC_QUERY(pQuery)) { + ekey = nextWin.ekey; + if (ekey > pQuery->ekey) { + ekey = pQuery->ekey; + } + } else { + ekey = nextWin.skey; + if (ekey < pQuery->ekey) { + ekey = pQuery->ekey; + } + } + forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true); pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); @@ -1934,8 +1933,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; } - validateTimestampForSupplementResult(pRuntimeEnv, num); - tfree(sasArray); return (int32_t)num; } @@ -2137,8 +2134,7 @@ int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) { } void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { - assert(slot >= 0 && slot < pWindowResInfo->size); - pWindowResInfo->pResult[slot].status.closed = true; + getWindowResult(pWindowResInfo, slot)->status.closed = true; } void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { @@ -2283,7 +2279,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep); - TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey : pRuntimeEnv->intervalWindow.ekey; + TSKEY ts = pQuery->skey; // QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey : + // pRuntimeEnv->intervalWindow.ekey; setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); } @@ -2408,7 +2405,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * * requires checking buffer during loop */ if ((pQuery->checkBufferInLoop == 1) && (++numOfRes) >= pQuery->pointsOffset) { - pQuery->lastKey = primaryKeyCol[pQuery->pos + j * step] + step; + pQuery->lastKey = lastKey + step; *forwardStep = j + 1; break; } @@ -2473,9 +2470,9 @@ static void validateQueryRangeAndData(SQueryRuntimeEnv *pRuntimeEnv, const TSKEY !QUERY_IS_ASC_QUERY(pQuery))); } -static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, int64_t *pPrimaryColumn, - SField *pFields, __block_search_fn_t searchFn, int32_t *numOfRes, - SWindowResInfo *pWindowResInfo) { +static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, + int64_t *pPrimaryColumn, SField *pFields, __block_search_fn_t searchFn, + int32_t *numOfRes, SWindowResInfo *pWindowResInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo); @@ -2500,12 +2497,13 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockI TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); - + // interval query with limit applied - if (pQuery->intervalTime > 0 && pQuery->limit.limit > 0 && pQuery->limit.limit <= numOfClosedTimeWindow(pWindowResInfo)) { + if (pQuery->intervalTime > 0 && pQuery->limit.limit > 0 && + pQuery->limit.limit <= numOfClosedTimeWindow(pWindowResInfo)) { setQueryStatus(pQuery, QUERY_COMPLETED); } - + assert(*numOfRes >= 0); // check if buffer is large enough for accommodating all qualified points @@ -3417,7 +3415,6 @@ static bool doGetQueryPos(TSKEY key, STableQuerySupportObj *pSupporter, SPointIn if (isPointInterpoQuery(pQuery)) { /* no qualified data in this query range */ return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); } else { - // getAlignedIntervalQueryRange(pRuntimeEnv, key, pQuery->skey, pQuery->ekey); return true; } } else { // key > pQuery->ekey, abort for normal query, continue for interp query @@ -3449,7 +3446,6 @@ static bool doSetDataInfo(STableQuerySupportObj *pSupporter, SPointInterpoSuppor return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); } else { - // getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pQuery->skey, pQuery->ekey); return true; } } @@ -3554,69 +3550,6 @@ static int64_t getGreaterEqualTimestamp(SQueryRuntimeEnv *pRuntimeEnv) { return key; } -static void getActualRange(STableQuerySupportObj *pSupporter, STimeWindow *pTimeWindow) { - SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; - __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; - - int32_t order = pQuery->order.order; - SWAP(pQuery->skey, pQuery->ekey, TSKEY); - pQuery->lastKey = pQuery->skey; - - if (QUERY_IS_ASC_QUERY(pQuery)) { // do the desc check first for asc query - pQuery->order.order ^= 1u; - - TSKEY t = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); - if (t > 0) { - pTimeWindow->ekey = t; - } else if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) { - pTimeWindow->ekey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); - } - - pQuery->order.order = order; - SWAP(pQuery->skey, pQuery->ekey, TSKEY); - pQuery->lastKey = pQuery->skey; - - if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) { - pTimeWindow->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); - } else { // set no data in file - pQuery->fileId = -1; - pTimeWindow->skey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); - } - - pQuery->skey = pTimeWindow->skey; - pQuery->ekey = pTimeWindow->ekey; - } else { - pQuery->order.order ^= 1u; - - if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) { - pTimeWindow->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); - } else { // set no data in file - pTimeWindow->skey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); - } - - // reverse check for maxValue in query range - SWAP(pQuery->skey, pQuery->ekey, TSKEY); - pQuery->order.order ^= 1u; - - // set no data in file - pQuery->lastKey = pQuery->skey; - - TSKEY t = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); - if (t > 0) { - pTimeWindow->ekey = t; - } else if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) { - pTimeWindow->ekey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); - } - - pQuery->ekey = pTimeWindow->skey; - pQuery->skey = pTimeWindow->ekey; - } - - pQuery->order.order = order; -} - /** * determine the first query range, according to raw query range [skey, ekey] and group-by interval. * the time interval for aggregating is not enforced to check its validation, the minimum interval is not less than @@ -5213,14 +5146,12 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl TSKEY * primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; int64_t start = taosGetTimestampUs(); + *pblockInfo = getBlockInfo(pRuntimeEnv); if (IS_DISK_DATA_BLOCK(pQuery)) { - SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot); - *pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK); - if (blockLoadStatus == DISK_DATA_LOADED) { *forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot], - searchFn, numOfRes, &pRuntimeEnv->windowResInfo); + searchFn, numOfRes, &pRuntimeEnv->windowResInfo); } else { *forwardStep = pblockInfo->size; } @@ -5228,12 +5159,8 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl pSummary->fileTimeUs += (taosGetTimestampUs() - start); } else { assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); - - SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot); - *pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK); - *forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes, - &pRuntimeEnv->windowResInfo); + &pRuntimeEnv->windowResInfo); pSummary->cacheTimeUs += (taosGetTimestampUs() - start); } @@ -5302,7 +5229,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { if (!Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED) && Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { if (nextPos >= blockInfo.size || nextPos < 0) { moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA); - + // slot/pos/fileId is updated in moveToNextBlock function savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos); } else { @@ -5404,7 +5331,7 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SM } } -static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, tFilePage *inputSrc, int32_t inputIdx, +static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowResult* pWindowRes, /*int32_t inputIdx,*/ bool mergeFlag) { SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; @@ -5421,9 +5348,10 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, tFilePage pCtx[i].hasNull = true; pCtx[i].nStartQueryTimestamp = timestamp; - pCtx[i].aInputElemBuf = ((char *)inputSrc->data) + - ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + - pCtx[i].outputBytes * inputIdx; + pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes); +// pCtx[i].aInputElemBuf = ((char *)inputSrc->data) + +// ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + +// pCtx[i].outputBytes * inputIdx; // in case of tag column, the tag information should be extracted from input buffer if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) { @@ -5578,7 +5506,7 @@ int64_t getCurrentTimestamp(SCompSupporter *pSupportor, int32_t meterIdx) { return *(int64_t *)(pPage->data + TSDB_KEYSIZE * pPos->rowIdx); } -int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param) { +int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) { int32_t left = *(int32_t *)pLeft; int32_t right = *(int32_t *)pRight; @@ -5598,6 +5526,7 @@ int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param return -1; } + //!!!!! tFilePage *pPageLeft = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[left]->pMeterQInfo, leftPos.pageIdx); int64_t leftTimestamp = *(int64_t *)(pPageLeft->data + TSDB_KEYSIZE * leftPos.rowIdx); @@ -5695,6 +5624,32 @@ void copyResToQueryResultBuf(STableQuerySupportObj *pSupporter, SQuery *pQuery) pSupporter->offset += 1; } +int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* pWindowRes) { + SQuery *pQuery = pRuntimeEnv->pQuery; +// bool hasMainFunction = hasMainOutput(pQuery); + + int64_t maxOutput = 0; + for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; + + /* + * ts, tag, tagprj function can not decide the output number of current query + * the number of output result is decided by main output + */ + if (/*hasMainFunction &&*/ + (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) { + continue; + } + + SResultInfo *pResultInfo = &pWindowRes->resultInfo[j]; + if (pResultInfo != NULL && maxOutput < pResultInfo->numOfRes) { + maxOutput = pResultInfo->numOfRes; + } + } + + return maxOutput; +} + int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterDataInfo, int32_t start, int32_t end) { // calculate the maximum required space @@ -5708,6 +5663,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery Position * posList = calloc(1, sizeof(Position) * (end - start)); SMeterDataInfo **pTableList = malloc(POINTER_BYTES * (end - start)); + //todo opt for the case of one table per group int32_t numOfMeters = 0; for (int32_t i = start; i < end; ++i) { int32_t sid = pMeterDataInfo[i].pMeterQInfo->sid; @@ -5717,7 +5673,9 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery pTableList[numOfMeters] = &pMeterDataInfo[i]; // set the merge start position: page:0, index:0 posList[numOfMeters].pageIdx = 0; - posList[numOfMeters++].rowIdx = 0; + posList[numOfMeters].rowIdx = 0; + + numOfMeters += 1; } } @@ -5731,7 +5689,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery SCompSupporter cs = {pTableList, posList, pSupporter}; SLoserTreeInfo *pTree = NULL; - tLoserTreeCreate(&pTree, numOfMeters, &cs, meterResultComparator); + tLoserTreeCreate(&pTree, numOfMeters, &cs, tableResultComparFn); SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; resetMergeResultBuf(pQuery, pCtx); @@ -5740,66 +5698,93 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery int64_t startt = taosGetTimestampMs(); - while (1) { - int32_t pos = pTree->pNode[0].index; - Position * position = &cs.pPosition[pos]; - SQueryDiskbasedResultBuf *pResultBuf = cs.pSupporter->runtimeEnv.pResultBuf; - tFilePage * pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); + while (1) {//todo add iterator + int32_t pos = pTree->pNode[0].index; + Position * position = &cs.pPosition[pos]; + +// SQueryDiskbasedResultBuf *pResultBuf = cs.pSupporter->runtimeEnv.pResultBuf; +// tFilePage *pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); + SWindowResInfo* pWindowResInfo = &pTableList[pos]->pMeterQInfo->windowResInfo; + SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, position->rowIdx); + + char* b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes); + TSKEY ts = GET_INT64_VAL(b); + +// int64_t ts = getCurrentTimestamp(&cs, pos); + assert(ts > 0 && ts == pWindowRes->window.skey); - int64_t ts = getCurrentTimestamp(&cs, pos); - assert(ts > 0); + int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes); + if (num <= 0) { + cs.pPosition[pos].rowIdx += 1; + + if (cs.pPosition[pos].rowIdx >= pWindowResInfo->size) { + cs.pPosition[pos].rowIdx = -1; - if (ts == lastTimestamp) { // merge with the last one - doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, true); - } else { - // copy data to disk buffer - if (buffer[0]->numOfElems == pQuery->pointsToRead) { - if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { - return -1; + // all input sources are exhausted + if (--numOfMeters == 0) { + break; } - - resetMergeResultBuf(pQuery, pCtx); } + } else { + if (ts == lastTimestamp) { // merge with the last one + doMerge(pRuntimeEnv, ts, pWindowRes, /*position->rowIdx,*/ true); + } else { + // copy data to disk buffer + if (buffer[0]->numOfElems == pQuery->pointsToRead) { + if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { + return -1; + } + + resetMergeResultBuf(pQuery, pCtx); + } - pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); - if (pPage->numOfElems <= 0) { // current source data page is empty +// pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); +// if (pPage->numOfElems <= 0) { // current source data page is empty // do nothing - } else { - doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, false); +// } else { + doMerge(pRuntimeEnv, ts, pWindowRes, /*position->rowIdx,*/ false); buffer[0]->numOfElems += 1; +// } } - } - - lastTimestamp = ts; - - if (cs.pPosition[pos].rowIdx >= pPage->numOfElems - 1) { - cs.pPosition[pos].rowIdx = 0; - cs.pPosition[pos].pageIdx += 1; // try next page - - // check if current page is empty or not. if it is empty, ignore it and try next - SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid); - if (cs.pPosition[pos].pageIdx <= list.size - 1) { - tFilePage *newPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); - - // if current source data page is null, it must be the last page of source output page - if (newPage->numOfElems <= 0) { - cs.pPosition[pos].pageIdx += 1; - assert(cs.pPosition[pos].pageIdx >= list.size - 1); - } - } - - // the following code must be executed if current source pages are exhausted - if (cs.pPosition[pos].pageIdx >= list.size) { - cs.pPosition[pos].pageIdx = -1; + + lastTimestamp = ts; + + if (cs.pPosition[pos].rowIdx >= pWindowResInfo->size) { cs.pPosition[pos].rowIdx = -1; - + // all input sources are exhausted if (--numOfMeters == 0) { break; } +// if (cs.pPosition[pos].rowIdx >= pPage->numOfElems - 1) { +// cs.pPosition[pos].rowIdx = 0; +// cs.pPosition[pos].pageIdx += 1; // try next page +// +// // check if current page is empty or not. if it is empty, ignore it and try next +// SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid); +// if (cs.pPosition[pos].pageIdx <= list.size - 1) { +// tFilePage *newPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); +// +// // if current source data page is null, it must be the last page of source output page +// if (newPage->numOfElems <= 0) { +// cs.pPosition[pos].pageIdx += 1; +// assert(cs.pPosition[pos].pageIdx >= list.size - 1); +// } +// } +// +// // the following code must be executed if current source pages are exhausted +// if (cs.pPosition[pos].pageIdx >= list.size) { +// cs.pPosition[pos].pageIdx = -1; +// cs.pPosition[pos].rowIdx = -1; +// +// // all input sources are exhausted +// if (--numOfMeters == 0) { +// break; +// } +// } + } else { + cs.pPosition[pos].rowIdx += 1; } - } else { - cs.pPosition[pos].rowIdx += 1; } tLoserTreeAdjust(pTree, pos + pTree->numOfEntries); @@ -5901,7 +5886,7 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { continue; } - SWindowResult *buf = &pWindowResInfo->pResult[i]; + SWindowResult *buf = getWindowResult(pWindowResInfo, i); // open/close the specified query for each group result for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -6165,14 +6150,11 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus SWAP(pQuery->skey, pQuery->ekey, TSKEY); pQuery->lastKey = pQuery->skey; pRuntimeEnv->startPos = pRuntimeEnv->endPos; - - SWAP(pRuntimeEnv->intervalWindow.skey, pRuntimeEnv->intervalWindow.ekey, TSKEY); } static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; SWAP(pQuery->skey, pQuery->ekey, TSKEY); - SWAP(pRuntimeEnv->intervalWindow.skey, pRuntimeEnv->intervalWindow.ekey, TSKEY); pQuery->lastKey = pStatus->lastKey; pQuery->skey = pStatus->skey; @@ -6249,7 +6231,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pResult = &pWindowResInfo->pResult[i]; + SWindowResult *pResult = getWindowResult(pWindowResInfo, i); if (!pResult->status.closed) { continue; } @@ -6587,25 +6569,24 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols) free(pMeterQueryInfo); } -void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, - TSKEY skey, TSKEY ekey) { +void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey) { if (pMeterQueryInfo == NULL) { return; } - //order has change already! + // order has change already! int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (!QUERY_IS_ASC_QUERY(pQuery)) { assert(pMeterQueryInfo->ekey >= pMeterQueryInfo->lastKey + step); } else { assert(pMeterQueryInfo->ekey <= pMeterQueryInfo->lastKey + step); } - + pMeterQueryInfo->ekey = pMeterQueryInfo->lastKey + step; - + SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY); pMeterQueryInfo->lastKey = pMeterQueryInfo->skey; - + // pMeterQueryInfo->queryRangeSet = 0; pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1; pMeterQueryInfo->cur.vnodeIndex = -1; @@ -7009,23 +6990,24 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet * @param pRuntimeEnv * @param pDataBlockInfoEx */ -void setExecutionContext(STableQuerySupportObj *pSupporter, SWindowResult *outputRes, int32_t meterIdx, - int32_t groupIdx, SMeterQueryInfo *pMeterQueryInfo) { +void setExecutionContext(STableQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t meterIdx, + int32_t groupIdx, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; - int32_t GROUP_RES_ID = 1; + SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo; + int32_t GROUPRESULTID = 1; - SWindowResult *pWindowRes = - doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&groupIdx, sizeof(groupIdx)); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIdx, sizeof(groupIdx)); if (pWindowRes == NULL) { return; } - // not assign result buffer yet, add new result buffer - // all group belong to one result set, and each group result has different group id so set the id to be one + /* + * not assign result buffer yet, add new result buffer + * all group belong to one result set, and each group result has different group id so set the id to be one + */ if (pWindowRes->pos.pageId == -1) { - int32_t ret = - addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, GROUP_RES_ID, pRuntimeEnv->numOfRowsPerPage); - if (ret != 0) { + if (addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage) != + TSDB_CODE_SUCCESS) { return; } } @@ -7033,18 +7015,8 @@ void setExecutionContext(STableQuerySupportObj *pSupporter, SWindowResult *outpu setWindowResOutputBuf(pRuntimeEnv, pWindowRes); initCtxOutputBuf(pRuntimeEnv); - vnodeSetTagValueInParam(pSupporter->pSidSet, pRuntimeEnv, pSupporter->pMeterSidExtInfo[meterIdx]); - - // set the right cursor position for ts buffer - if (pSupporter->runtimeEnv.pTSBuf != NULL) { - if (pMeterQueryInfo->cur.vnodeIndex == -1) { - pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key; - - tsBufGetElemStartPos(pSupporter->runtimeEnv.pTSBuf, 0, pMeterQueryInfo->tag); - } else { - tsBufSetCursor(pSupporter->runtimeEnv.pTSBuf, &pMeterQueryInfo->cur); - } - } + pMeterQueryInfo->lastKey = nextKey; + setAdditionalInfo(pSupporter, meterIdx, pMeterQueryInfo); } static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { @@ -7072,101 +7044,43 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * } } -static char *getOutputResPos(SQueryRuntimeEnv *pRuntimeEnv, tFilePage *pData, int32_t row, int32_t col) { - // the output for each record should be less than the DEFAULT_INTERN_BUF_SIZE - assert(pRuntimeEnv->pCtx[col].outputBytes <= DEFAULT_INTERN_BUF_SIZE); - - return (char *)pData->data + pRuntimeEnv->offset[col] * pRuntimeEnv->numOfRowsPerPage + - pRuntimeEnv->pCtx[col].outputBytes * row; -} - -void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - - if (pRuntimeEnv->scanFlag == SUPPLEMENTARY_SCAN && numOfIncrementRes > 0) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; - if (functionId == TSDB_FUNC_TS) { - assert(*(TSKEY *)pCtx[i].aOutputBuf == pCtx[i].nStartQueryTimestamp); - } - } - } -} - -int32_t setOutputBufferForIntervalQuery(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) { - SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - SWindowResInfo * pWindowResInfo = &pMeterQueryInfo->windowResInfo; - - STimeWindow win = getActiveTimeWindow(pWindowResInfo, pMeterQueryInfo->lastKey, pRuntimeEnv->pQuery); - - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE); - if (pWindowRes == NULL) { - return -1; - } - - // not allocated yet, allocate new buffer - if (pWindowRes->pos.pageId == -1) { - int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, pMeterQueryInfo->sid, pRuntimeEnv->numOfRowsPerPage); - if (ret != 0) { - return -1; - } - } - - pWindowRes->window = win; - - for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].resultInfo = &pWindowRes->resultInfo[i]; - pRuntimeEnv->pCtx[i].aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t setIntervalQueryExecutionContext(STableQuerySupportObj *pSupporter, int32_t meterIdx, - SMeterQueryInfo *pMeterQueryInfo) { +int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; assert(pMeterQueryInfo->lastKey > 0); - // not enough disk space or memory buffer for intermediate results - if (setOutputBufferForIntervalQuery(pRuntimeEnv, pMeterQueryInfo) != TSDB_CODE_SUCCESS) { - return -1; - } - - initCtxOutputBuf(pRuntimeEnv); vnodeSetTagValueInParam(pSupporter->pSidSet, pRuntimeEnv, pSupporter->pMeterSidExtInfo[meterIdx]); // both the master and supplement scan needs to set the correct ts comp start position - if (pSupporter->runtimeEnv.pTSBuf != NULL) { + if (pRuntimeEnv->pTSBuf != NULL) { if (pMeterQueryInfo->cur.vnodeIndex == -1) { pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key; - tsBufGetElemStartPos(pSupporter->runtimeEnv.pTSBuf, 0, pMeterQueryInfo->tag); + tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pMeterQueryInfo->tag); // keep the cursor info of current meter - pMeterQueryInfo->cur = pSupporter->runtimeEnv.pTSBuf->cur; + pMeterQueryInfo->cur = pRuntimeEnv->pTSBuf->cur; } else { - tsBufSetCursor(pSupporter->runtimeEnv.pTSBuf, &pMeterQueryInfo->cur); + tsBufSetCursor(pRuntimeEnv->pTSBuf, &pMeterQueryInfo->cur); } } return 0; } -int64_t getNextAccessedKeyInData(SQuery *pQuery, int64_t *pPrimaryCol, SBlockInfo *pBlockInfo, int32_t blockStatus) { - assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1); - - TSKEY key = -1; - if (IS_DATA_BLOCK_LOADED(blockStatus)) { - key = pPrimaryCol[pQuery->pos]; - } else { // while the data block is not loaded, the position must be the first or last position - assert(pQuery->pos == pBlockInfo->size - 1 || pQuery->pos == 0); - key = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->keyFirst : pBlockInfo->keyLast; - } - - assert((key >= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || (key <= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))); - return key; -} +// int64_t getNextAccessedKeyInData(SQuery *pQuery, int64_t *pPrimaryCol, SBlockInfo *pBlockInfo, int32_t blockStatus) { +// assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1); +// +// TSKEY key = -1; +// if (IS_DATA_BLOCK_LOADED(blockStatus)) { +// key = pPrimaryCol[pQuery->pos]; +// } else { // while the data block is not loaded, the position must be the first or last position +// assert(pQuery->pos == pBlockInfo->size - 1 || pQuery->pos == 0); +// key = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->keyFirst : pBlockInfo->keyLast; +// } +// +// assert((key >= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || (key <= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))); +// return key; +//} /* * There are two cases to handle: @@ -7227,14 +7141,6 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO } } - win = getActiveTimeWindow(pWindowResInfo, pQuery->skey, pQuery); - SWindowResult *pWindowRes = - doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&pQuery->skey, TSDB_KEYSIZE); - if (pWindowRes == NULL) { - return; - } - - pWindowRes->window = win; pMeterQueryInfo->queryRangeSet = 1; pMeterQueryInfo->lastKey = pQuery->skey; pMeterQueryInfo->skey = pQuery->skey; @@ -7297,7 +7203,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk if (((pQuery->lastKey <= pBlock->keyFirst && pQuery->ekey >= pBlock->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->ekey <= pBlock->keyFirst && pQuery->lastKey >= pBlock->keyLast && !QUERY_IS_ASC_QUERY(pQuery))) && onDemand) { - int32_t req = 0; + uint32_t req = 0; if (pQuery->numOfFilterCols > 0) { req = BLK_DATA_ALL_NEEDED; } else { @@ -7369,9 +7275,8 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk /* find first qualified record position in this block */ if (loadTS) { - /* find first qualified record position in this block */ - pQuery->pos = - searchFn(pRuntimeEnv->primaryColBuffer->data, pBlock->numOfPoints, pQuery->lastKey, pQuery->order.order); + pQuery->pos = searchFn((char *)primaryKeys, pBlock->numOfPoints, pQuery->lastKey, pQuery->order.order); + /* boundary timestamp check */ assert(pBlock->keyFirst == primaryKeys[0] && pBlock->keyLast == primaryKeys[pBlock->numOfPoints - 1]); } @@ -7394,20 +7299,6 @@ bool onDemandLoadDatablock(SQuery *pQuery, int16_t queryRangeSet) { return (pQuery->intervalTime == 0) || ((queryRangeSet == 1) && (pQuery->intervalTime > 0)); } -static void validateResultBuf(STableQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) { - SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; - SQuery * pQuery = pSupporter->runtimeEnv.pQuery; - SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - - SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); - int32_t id = getLastPageId(&list); - - tFilePage *newOutput = getResultBufferPageById(pResultBuf, id); - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - assert(pRuntimeEnv->pCtx[i].aOutputBuf - newOutput->data < DEFAULT_INTERN_BUF_SIZE); - } -} - static int32_t getNumOfSubset(STableQuerySupportObj *pSupporter) { SQuery *pQuery = pSupporter->runtimeEnv.pQuery; @@ -7506,8 +7397,23 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { assert(pQuery->pointsRead <= pQuery->pointsToRead); } +static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterDataInfo) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + // update the number of result for each, only update the number of rows for the corresponding window result. + if (pQuery->intervalTime == 0) { + int32_t g = pMeterDataInfo->groupIdx; + assert(pRuntimeEnv->windowResInfo.size > 0); + + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g)); + if (pWindowRes->numOfRows == 0) { + pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv); + } + } +} + void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo, - SBlockInfo *pBlockInfo, SField *pFields, __block_search_fn_t searchFn) { + SBlockInfo *pBlockInfo, SField *pFields, __block_search_fn_t searchFn) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; SMeterQueryInfo * pMeterQueryInfo = pMeterDataInfo->pMeterQInfo; @@ -7525,17 +7431,10 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, forwardStep, pFields, pBlockInfo, pWindowResInfo, searchFn); } - // update the number of result for each - if (pQuery->intervalTime == 0) { // todo refactor - SWindowResInfo *p1 = &pRuntimeEnv->windowResInfo; - for (int32_t i = 0; i < p1->size; ++i) { - SWindowResult* pResult = getWindowResult(p1, i); - if (isWindowResClosed(p1, i) && pResult->numOfRows == 0) { - pResult->numOfRows = getNumOfResult(pRuntimeEnv); - } - } - } + assert(numOfRes >= 0); + // todo merge refactor + updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo); updatelastkey(pQuery, pMeterQueryInfo); } diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index ecffc437a93670c92db9a401906f6d24003c794b..f75e3aebfd5ac3cf87664c72c9d753e5b20181eb 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -84,7 +84,7 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { } } -static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { +static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo) { SQuery * pQuery = &pQInfo->query; STableQuerySupportObj *pSupporter = pQInfo->pTableQuerySupporter; SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->pTableQuerySupporter->runtimeEnv; @@ -119,18 +119,18 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { pQInfo->pObj = pMeterObj; pRuntimeEnv->pMeterObj = pMeterObj; - if (pMeterInfo[k].pMeterQInfo == NULL) { - pMeterInfo[k].pMeterQInfo = + if (pMeterDataInfo[k].pMeterQInfo == NULL) { + pMeterDataInfo[k].pMeterQInfo = createMeterQueryInfo(pSupporter, pMeterObj->sid, pSupporter->rawSKey, pSupporter->rawEKey); } - if (pMeterInfo[k].pMeterObj == NULL) { // no data in disk for this meter, set its pointer - setMeterDataInfo(&pMeterInfo[k], pMeterObj, k, groupIdx); + if (pMeterDataInfo[k].pMeterObj == NULL) { // no data in disk for this meter, set its pointer + setMeterDataInfo(&pMeterDataInfo[k], pMeterObj, k, groupIdx); } - assert(pMeterInfo[k].meterOrderIdx == k && pMeterObj == pMeterInfo[k].pMeterObj); + assert(pMeterDataInfo[k].meterOrderIdx == k && pMeterObj == pMeterDataInfo[k].pMeterObj); - SMeterQueryInfo *pMeterQueryInfo = pMeterInfo[k].pMeterQInfo; + SMeterQueryInfo *pMeterQueryInfo = pMeterDataInfo[k].pMeterQInfo; restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); /* @@ -208,11 +208,10 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { } if (pQuery->intervalTime == 0) { - setExecutionContext(pSupporter, pRuntimeEnv->windowResInfo.pResult, k, pMeterInfo[k].groupIdx, - pMeterQueryInfo); + setExecutionContext(pSupporter, pMeterQueryInfo, k, pMeterDataInfo[k].groupIdx, key); } else { setIntervalQueryRange(pMeterQueryInfo, pSupporter, key); - int32_t ret = setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo); + int32_t ret = setAdditionalInfo(pSupporter, k, pMeterQueryInfo); if (ret != TSDB_CODE_SUCCESS) { pQInfo->killed = 1; return; @@ -224,14 +223,14 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { // only record the key on last block SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus); - SBlockInfo binfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK); + SBlockInfo binfo = getBlockInfo(pRuntimeEnv); dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, bstatus:%d", GET_QINFO_ADDR(pQuery), binfo.keyFirst, binfo.keyLast, pQuery->fileId, pQuery->slot, pQuery->pos, pRuntimeEnv->blockStatus); totalBlocks++; - stableApplyFunctionsOnBlock(pSupporter, &pMeterInfo[k], &binfo, NULL, searchFn); + stableApplyFunctionsOnBlock(pSupporter, &pMeterDataInfo[k], &binfo, NULL, searchFn); if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) { break; @@ -400,8 +399,8 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo SCompBlock *pBlock = pInfoEx->pBlock.compBlock; bool ondemandLoad = onDemandLoadDatablock(pQuery, pMeterQueryInfo->queryRangeSet); - int32_t ret = LoadDatablockOnDemand(pBlock, &pInfoEx->pBlock.fields, &pRuntimeEnv->blockStatus, pRuntimeEnv, - fileIdx, pInfoEx->blockIndex, searchFn, ondemandLoad); + ret = LoadDatablockOnDemand(pBlock, &pInfoEx->pBlock.fields, &pRuntimeEnv->blockStatus, pRuntimeEnv, fileIdx, + pInfoEx->blockIndex, searchFn, ondemandLoad); if (ret != DISK_DATA_LOADED) { pSummary->skippedFileBlocks++; continue; @@ -425,21 +424,22 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo // if data block is not loaded, it must be the intermediate blocks assert((pBlock->keyFirst >= pQuery->lastKey && pBlock->keyLast <= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pBlock->keyFirst >= pQuery->ekey && pBlock->keyLast <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); + nextKey = QUERY_IS_ASC_QUERY(pQuery)? pBlock->keyFirst:pBlock->keyLast; } if (pQuery->intervalTime == 0) { - setExecutionContext(pSupporter, pRuntimeEnv->windowResInfo.pResult, pOneMeterDataInfo->meterOrderIdx, - pOneMeterDataInfo->groupIdx, pMeterQueryInfo); - } else /* if (pQuery->intervalTime > 0)*/ { // interval query + setExecutionContext(pSupporter, pMeterQueryInfo, pOneMeterDataInfo->meterOrderIdx, pOneMeterDataInfo->groupIdx, + nextKey); + } else { // interval query setIntervalQueryRange(pMeterQueryInfo, pSupporter, nextKey); - ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); + ret = setAdditionalInfo(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); if (ret != TSDB_CODE_SUCCESS) { tfree(pReqMeterDataInfo); // error code has been set pQInfo->killed = 1; return; } } - + stableApplyFunctionsOnBlock(pSupporter, pOneMeterDataInfo, &binfo, pInfoEx->pBlock.fields, searchFn); } @@ -849,8 +849,8 @@ static void doOrderedScan(SQInfo *pQInfo) { } static void setupMeterQueryInfoForSupplementQuery(STableQuerySupportObj *pSupporter) { - SQuery* pQuery = pSupporter->runtimeEnv.pQuery; - + SQuery *pQuery = pSupporter->runtimeEnv.pQuery; + for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo; changeMeterQueryInfoForSuppleQuery(pQuery, pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey); @@ -1101,7 +1101,7 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter doFinalizeResult(pRuntimeEnv); -// int64_t maxOutput = getNumOfResult(pRuntimeEnv); + // int64_t maxOutput = getNumOfResult(pRuntimeEnv); // here we can ignore the records in case of no interpolation // todo handle offset, in case of top/bottom interval query @@ -1109,8 +1109,8 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter pQuery->interpoType == TSDB_INTERPO_NONE) { // maxOutput <= 0, means current query does not generate any results int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); - - int32_t c = MIN(numOfClosed, pQuery->limit.offset); + + int32_t c = MIN(numOfClosed, pQuery->limit.offset); clearFirstNTimeWindow(pRuntimeEnv, c); pQuery->limit.offset -= c; } else { @@ -1127,16 +1127,16 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter break; } -// /* -// * the scan limitation mechanism is upon here, -// * 1. since there is only one(k) record is generated in one scan operation -// * 2. remain space is not sufficient for next query output, abort -// */ -// if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) || -// ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) { -// setQueryStatus(pQuery, QUERY_RESBUF_FULL); -// break; -// } + // /* + // * the scan limitation mechanism is upon here, + // * 1. since there is only one(k) record is generated in one scan operation + // * 2. remain space is not sufficient for next query output, abort + // */ + // if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) || + // ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) { + // setQueryStatus(pQuery, QUERY_RESBUF_FULL); + // break; + // } } } @@ -1158,7 +1158,7 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { pSupporter->subgroupIdx = 0; // always start from 0 pQuery->pointsRead = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - + clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx); } @@ -1221,11 +1221,11 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { assert(pQInfo->refCount >= 1); - SQuery * pQuery = &pQInfo->query; - SMeterObj *pMeterObj = pQInfo->pObj; - STableQuerySupportObj* pSupporter = pQInfo->pTableQuerySupporter; - SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; - + SQuery * pQuery = &pQInfo->query; + SMeterObj * pMeterObj = pQInfo->pObj; + STableQuerySupportObj *pSupporter = pQInfo->pTableQuerySupporter; + SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; + assert(pRuntimeEnv->pMeterObj == pMeterObj); dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid, @@ -1265,13 +1265,13 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || pQuery->intervalTime > 0) { pQuery->pointsRead = 0; pSupporter->subgroupIdx = 0; // always start from 0 - + if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQInfo->pointsRead += pQuery->pointsRead; - + clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx); - + if (pQuery->pointsRead > 0) { dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead,