From ca136d47de7d0c3dcba5547f8aaad45617903e0b Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 14 Jan 2020 10:39:30 +0800 Subject: [PATCH] add the support of issue #1131. [tbase-901] --- src/client/src/tscParseInsert.c | 1 - src/client/src/tscSql.c | 2 +- src/system/detail/inc/vnodeQueryImpl.h | 7 +- src/system/detail/inc/vnodeRead.h | 44 +- src/system/detail/src/vnodeQueryImpl.c | 916 ++++++++++++---------- src/system/detail/src/vnodeQueryProcess.c | 34 +- 6 files changed, 530 insertions(+), 474 deletions(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index c8a2c86b14..6ef28a55f6 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1307,7 +1307,6 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) { tscTrace("continue parse sql: %s", pSql->asyncTblPos); } - if (tscIsInsertOrImportData(pSql->sqlstr)) { /* * only for async multi-vnode insertion diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index a0ed09fd0f..07587a1500 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -829,7 +829,7 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd* pCmd) { char* z = NULL; if (len > 0) { - z = strstr (pCmd->payload, "invalid sql"); + z = strstr (pCmd->payload, "invalid SQL"); } return z != NULL; diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 7de221e8d4..7870c0a9ed 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -129,6 +129,7 @@ bool isPointInterpoQuery(SQuery* pQuery); bool isTopBottomQuery(SQuery* pQuery); bool isFirstLastRowQuery(SQuery* pQuery); bool isTSCompQuery(SQuery* pQuery); +bool notHasQueryTimeRange(SQuery *pQuery); bool needSupplementaryScan(SQuery* pQuery); bool onDemandLoadDatablock(SQuery* pQuery, int16_t queryRangeSet); @@ -172,10 +173,10 @@ void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj* pSupporter); void copyFromGroupBuf(SQInfo* pQInfo, SOutputRes* result); -SBlockInfo getBlockBasicInfo(void* pBlock, int32_t blockType); -SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQuery* pQuery, int32_t slot); +SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType); +SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot); -void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32_t blockStatus, char* data, +void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32_t blockStatus, SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields, __block_search_fn_t searchFn); diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 38308bb12a..7673f770db 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -35,19 +35,19 @@ typedef struct { int32_t fileId; } SPositionInfo; -typedef struct SQueryLoadBlockInfo { +typedef struct SLoadDataBlockInfo { int32_t fileListIndex; /* index of this file in files list of this vnode */ int32_t fileId; int32_t slotIdx; int32_t sid; bool tsLoaded; // if timestamp column of current block is loaded or not -} SQueryLoadBlockInfo; +} SLoadDataBlockInfo; -typedef struct SQueryLoadCompBlockInfo { +typedef struct SLoadCompBlockInfo { int32_t sid; /* meter sid */ int32_t fileId; int32_t fileListIndex; -} SQueryLoadCompBlockInfo; +} SLoadCompBlockInfo; /* * the header file info for one vnode @@ -126,20 +126,28 @@ typedef struct RuntimeEnvironment { SQuery* pQuery; SMeterObj* pMeterObj; SQLFunctionCtx* pCtx; - SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */ - SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ - SQueryFilesInfo vnodeFileInfo; - int16_t numOfRowsPerPage; - int16_t offset[TSDB_MAX_COLUMNS]; - int16_t scanFlag; // denotes reversed scan of data or not - SInterpolationInfo interpoInfo; - SData** pInterpoBuf; - SOutputRes* pResult; // reference to SQuerySupporter->pResult - void* hashList; - int32_t usedIndex; // assigned SOutputRes in list - STSBuf* pTSBuf; - STSCursor cur; - SQueryCostSummary summary; + SLoadDataBlockInfo loadBlockInfo; /* record current block load information */ + SLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ + SQueryFilesInfo vnodeFileInfo; + int16_t numOfRowsPerPage; + int16_t offset[TSDB_MAX_COLUMNS]; + int16_t scanFlag; // denotes reversed scan of data or not + SInterpolationInfo interpoInfo; + SData** pInterpoBuf; + SOutputRes* pResult; // reference to SQuerySupporter->pResult + void* hashList; + int32_t usedIndex; // assigned SOutputRes in list + STSBuf* pTSBuf; + STSCursor cur; + SQueryCostSummary summary; + + /* + * Temporarily hold the in-memory cache block info during scan cache blocks + * Here we do not use the cacheblock info from pMeterObj, simple because it may change anytime + * during the query by the subumit/insert handling threads. + * So we keep a copy of the support structure as well as the cache block data itself. + */ + SCacheBlock cacheBlock; } SQueryRuntimeEnv; /* intermediate result during multimeter query involves interval */ diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 6d6d191c0b..09030b14a8 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -54,14 +54,14 @@ enum { static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, int32_t size); -static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo); +static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo); static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn, bool loadData); static int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterHeadDataInfo, int32_t start, int32_t end); -static TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index); +static TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv *pRuntimeEnv, SCacheBlock *pBlock, int32_t index); static TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index); static void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, int32_t pos); @@ -71,11 +71,11 @@ static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pRes static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY keyInData, TSKEY skey, TSKEY ekey); static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo, - SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, char *sdata, SField *pFields, + SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, __block_search_fn_t searchFn); static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); -static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, +static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields, __block_search_fn_t searchFn); @@ -221,7 +221,7 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj SQuery *pQuery = pRuntimeEnv->pQuery; // check if data file header of this table has been loaded into memory, avoid to reloaded comp Block info - SQueryLoadCompBlockInfo *pLoadCompBlockInfo = &pRuntimeEnv->loadCompBlockInfo; + SLoadCompBlockInfo *pLoadCompBlockInfo = &pRuntimeEnv->loadCompBlockInfo; // if vnodeFreeFields is called, the pQuery->pFields is NULL if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid && @@ -235,14 +235,14 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj } static void vnodeSetCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex, int32_t sid) { - SQueryLoadCompBlockInfo *pLoadCompBlockInfo = &pRuntimeEnv->loadCompBlockInfo; - - pLoadCompBlockInfo->sid = sid; - pLoadCompBlockInfo->fileListIndex = fileIndex; - pLoadCompBlockInfo->fileId = pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID; + SLoadCompBlockInfo *pCompBlockLoadInfo = &pRuntimeEnv->loadCompBlockInfo; + + pCompBlockLoadInfo->sid = sid; + pCompBlockLoadInfo->fileListIndex = fileIndex; + pCompBlockLoadInfo->fileId = pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID; } -static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo) { +static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) { pCompBlockLoadInfo->sid = -1; pCompBlockLoadInfo->fileId = -1; pCompBlockLoadInfo->fileListIndex = -1; @@ -251,13 +251,11 @@ static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadIn static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex, bool loadPrimaryTS) { SQuery * pQuery = pRuntimeEnv->pQuery; - SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; + SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; /* this block has been loaded into memory, return directly */ if (pLoadInfo->fileId == pQuery->fileId && pLoadInfo->slotIdx == pQuery->slot && pQuery->slot != -1 && - pLoadInfo->sid == pMeterObj->sid) { - assert(fileIndex == pLoadInfo->fileListIndex); - + pLoadInfo->sid == pMeterObj->sid && pLoadInfo->fileListIndex == fileIndex) { // previous load operation does not load the primary timestamp column, we only need to load the timestamp column if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadPrimaryTS) { return DISK_BLOCK_LOAD_TS; @@ -272,7 +270,7 @@ static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj * static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex, bool tsLoaded) { SQuery * pQuery = pRuntimeEnv->pQuery; - SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; + SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; pLoadInfo->fileId = pQuery->fileId; pLoadInfo->slotIdx = pQuery->slot; @@ -281,7 +279,7 @@ static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj pLoadInfo->tsLoaded = tsLoaded; } -static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { +static void vnodeInitDataBlockInfo(SLoadDataBlockInfo *pBlockLoadInfo) { pBlockLoadInfo->slotIdx = -1; pBlockLoadInfo->fileId = -1; pBlockLoadInfo->sid = -1; @@ -990,7 +988,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR } // todo ignore the blockType, pass the pQuery into this function -SBlockInfo getBlockBasicInfo(void *pBlock, int32_t blockType) { +SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void *pBlock, int32_t blockType) { SBlockInfo blockInfo = {0}; if (IS_FILE_BLOCK(blockType)) { SCompBlock *pDiskBlock = (SCompBlock *)pBlock; @@ -1002,8 +1000,8 @@ SBlockInfo getBlockBasicInfo(void *pBlock, int32_t blockType) { } else { SCacheBlock *pCacheBlock = (SCacheBlock *)pBlock; - blockInfo.keyFirst = getTimestampInCacheBlock(pCacheBlock, 0); - blockInfo.keyLast = getTimestampInCacheBlock(pCacheBlock, pCacheBlock->numOfPoints - 1); + blockInfo.keyFirst = getTimestampInCacheBlock(pRuntimeEnv, pCacheBlock, 0); + blockInfo.keyLast = getTimestampInCacheBlock(pRuntimeEnv, pCacheBlock, pCacheBlock->numOfPoints - 1); blockInfo.size = pCacheBlock->numOfPoints; blockInfo.numOfCols = pCacheBlock->pMeterObj->numOfColumns; } @@ -1073,14 +1071,37 @@ void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, in position->pos = pos; } -static FORCE_INLINE void saveNextAccessPositionInCache(SPositionInfo *position, int32_t slotIdx, int32_t pos) { - savePointPosition(position, -1, slotIdx, pos); +bool isCacheBlockValid(SQuery* pQuery, SCacheBlock* pBlock, SMeterObj* pMeterObj) { + if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) { + + SMeterObj* pNewMeterObj = pBlock->pMeterObj; + char* id = (pNewMeterObj != NULL)? pNewMeterObj->meterId:NULL; + + dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, " + "blockMeterObj:%p, blockMeter id:%s, first:%d, last:%d, numOfBlocks:%d", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId, + pQuery->blockId, pMeterObj, pNewMeterObj, id, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks); + + return false; + } + + if (pBlock->numOfPoints == 0) { + dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is empty. slot:%d first:%d, last:%d, numOfBlocks:%d," + "allocated but not write data yet.", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, + pMeterObj->meterId, pQuery->slot, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks); + + return false; + } + + return true; } // todo all functions that call this function should check the returned data blocks status -SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slot) { +SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot) { + SQuery* pQuery = pRuntimeEnv->pQuery; + SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; - if (pCacheInfo == NULL || pCacheInfo->cacheBlocks == NULL || slot < 0) { + if (pCacheInfo == NULL || pCacheInfo->cacheBlocks == NULL || slot < 0 || slot >= pCacheInfo->maxBlocks) { return NULL; } @@ -1088,21 +1109,88 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slo SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot]; if (pBlock == NULL) { - dError("QInfo:%p NULL Block In Cache, available block:%d, last block:%d, accessed null block:%d, pBlockId:%d", - GET_QINFO_ADDR(pQuery), pCacheInfo->numOfBlocks, pCacheInfo->currentSlot, slot, pQuery->blockId); + // the cache info snapshot must be existed. + int32_t curNumOfBlocks = pCacheInfo->numOfBlocks; + int32_t curSlot = pCacheInfo->currentSlot; + + dError("QInfo:%p NULL Block In Cache, snapshot (available blocks:%d, last block:%d), current (available blocks:%d, " + "last block:%d), accessed null block:%d, pBlockId:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, + pQuery->currentSlot, curNumOfBlocks, curSlot, slot, pQuery->blockId); + return NULL; } - if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) { - dWarn( - "QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, " - "blockMeterObj:%p", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId, - pQuery->blockId, pMeterObj, pBlock->pMeterObj); + // block is empty or block does not belongs to current table, return NULL value + if (!isCacheBlockValid(pQuery, pBlock, pMeterObj)) { return NULL; } - - return pBlock; + + //the accessed cache block has been loaded already, return directly + if (vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD) { + TSKEY skey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, 0); + TSKEY ekey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pBlock->numOfPoints - 1); + + dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, cache block has been loaded, no need to load again, ts:%d, " + "slot:%d, brange:%lld-%lld, rows:%d", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1, + pQuery->slot, skey, ekey, pBlock->numOfPoints); + + return &pRuntimeEnv->cacheBlock; + } + + // keep the structure as well as the block data into local buffer + memcpy(&pRuntimeEnv->cacheBlock, pBlock, sizeof(SCacheBlock)); + + // keep the data from in cache into the temporarily allocated buffer + for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + SColumnInfoEx *pColumnInfoEx = &pQuery->colList[i]; + + int16_t columnIndex = pColumnInfoEx->colIdx; + int16_t columnIndexInBuf = pColumnInfoEx->colIdxInBuf; + + SColumn* pCol = &pMeterObj->schema[columnIndex]; + + int16_t bytes = pCol->bytes; + int16_t type = pCol->type; + + char* dst = pRuntimeEnv->colDataBuffer[columnIndexInBuf]->data; + + if (pQuery->colList[i].colIdx != -1) { + assert(pCol->colId == pQuery->colList[i].data.colId && bytes == pColumnInfoEx->data.bytes && + type == pColumnInfoEx->data.type); + + memcpy(dst, pBlock->offset[columnIndex], pBlock->numOfPoints * bytes); + } else { + setNullN(dst, type, bytes, pBlock->numOfPoints); + } + } + + // if the primary timestamp are not loaded by default, always load it here into buffer + if(!PRIMARY_TSCOL_LOADED(pQuery)) { + memcpy(pRuntimeEnv->primaryColBuffer->data, pBlock->offset[0], TSDB_KEYSIZE*pBlock->numOfPoints); + } + + pQuery->fileId = -1; + pQuery->slot = slot; + + if (!isCacheBlockValid(pQuery, pBlock, pMeterObj)) { + return NULL; + } + + /* + * the accessed cache block still belongs to current meterObj, go on + * update the load data block info + */ + vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, -1, true); + + TSKEY skey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, 0); + TSKEY ekey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pBlock->numOfPoints - 1); + + dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, load cache block, ts:%d, slot:%d, brange:%lld-%lld, rows:%d", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1, + pQuery->slot, skey, ekey, pBlock->numOfPoints); + + return &pRuntimeEnv->cacheBlock; } static SCompBlock *getDiskDataBlock(SQuery *pQuery, int32_t slot) { @@ -1110,11 +1198,13 @@ static SCompBlock *getDiskDataBlock(SQuery *pQuery, int32_t slot) { return &pQuery->pBlock[slot]; } -static void *getGenericDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slot) { +static void *getGenericDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeEnv, int32_t slot) { + SQuery* pQuery = pRuntimeEnv->pQuery; + if (IS_DISK_DATA_BLOCK(pQuery)) { return getDiskDataBlock(pQuery, slot); } else { - return getCacheDataBlock(pMeterObj, pQuery, slot); + return getCacheDataBlock(pMeterObj, pRuntimeEnv, slot); } } @@ -1203,14 +1293,6 @@ static bool getQualifiedDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRunti return true; } -static char *doGetDataBlockImpl(const char *sdata, int32_t colIdx, bool isDiskFileBlock) { - if (isDiskFileBlock) { - return ((SData **)sdata)[colIdx]->data; - } else { - return ((SCacheBlock *)sdata)->offset[colIdx]; - } -} - static SField *getFieldInfo(SQuery *pQuery, SBlockInfo *pBlockInfo, SField *pFields, int32_t column) { // no SField info exist, or column index larger than the output column, no result. if (pFields == NULL || column >= pQuery->numOfOutputCols) { @@ -1261,30 +1343,13 @@ static bool hasNullVal(SQuery *pQuery, int32_t col, SBlockInfo *pBlockInfo, SFie return ret; } -static char *doGetDataBlocks(bool isDiskFileBlock, SQueryRuntimeEnv *pRuntimeEnv, char *data, int32_t colIdx, - int32_t colId, int16_t type, int16_t bytes, int32_t tmpBufIndex) { - char *pData = NULL; - - if (isDiskFileBlock) { - pData = doGetDataBlockImpl(data, colIdx, isDiskFileBlock); - } else { - SCacheBlock *pCacheBlock = (SCacheBlock *)data; - SMeterObj * pMeter = pRuntimeEnv->pMeterObj; - - if (colIdx < 0 || pMeter->numOfColumns <= colIdx || pMeter->schema[colIdx].colId != colId) { - // data in cache is not current available, we need fill the data block in null value - pData = pRuntimeEnv->colDataBuffer[tmpBufIndex]->data; - setNullN(pData, type, bytes, pCacheBlock->numOfPoints); - } else { - pData = doGetDataBlockImpl(data, colIdx, isDiskFileBlock); - } - } - +static char *doGetDataBlocks(SQuery* pQuery, SData** data, int32_t colIdx) { + assert(colIdx >= 0 && colIdx < pQuery->numOfCols); + char* pData = data[colIdx]->data; return pData; } -static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, char *data, SArithmeticSupport *sas, int32_t col, - int32_t size, bool isDiskFileBlock) { +static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size) { SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; @@ -1303,21 +1368,17 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, char *data, SArithmeti } for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - int32_t colIdx = isDiskFileBlock ? pQuery->colList[i].colIdxInBuf : pQuery->colList[i].colIdx; - SColumnInfo *pColMsg = &pQuery->colList[i].data; - char * pData = doGetDataBlocks(isDiskFileBlock, pRuntimeEnv, data, colIdx, pColMsg->colId, pColMsg->type, - pColMsg->bytes, pQuery->colList[i].colIdxInBuf); + char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf); sas->elemSize[i] = pColMsg->bytes; sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset } + sas->numOfCols = pQuery->numOfCols; sas->offset = 0; } else { // other type of query function SColIndexEx *pCol = &pQuery->pSelectExpr[col].pBase.colInfo; - int32_t colIdx = isDiskFileBlock ? pCol->colIdxInBuf : pCol->colIdx; - if (TSDB_COL_IS_TAG(pCol->flag)) { dataBlock = NULL; } else { @@ -1326,8 +1387,7 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, char *data, SArithmeti * the remain meter may not have the required column in cache actually. * So, the validation of required column in cache with the corresponding meter schema is reinforced. */ - dataBlock = doGetDataBlocks(isDiskFileBlock, pRuntimeEnv, data, colIdx, pCol->colId, pCtx[col].inputType, - pCtx[col].inputBytes, pCol->colIdxInBuf); + dataBlock = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pCol->colIdxInBuf); } } @@ -1339,17 +1399,17 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, char *data, SArithmeti * @param pRuntimeEnv * @param forwardStep * @param primaryKeyCol - * @param data * @param pFields * @param isDiskFileBlock * @return the incremental number of output value, so it maybe 0 for fixed number of query, * such as count/min/max etc. */ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep, TSKEY *primaryKeyCol, - char *data, SField *pFields, SBlockInfo *pBlockInfo, bool isDiskFileBlock) { + SField *pFields, SBlockInfo *pBlockInfo) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery * pQuery = pRuntimeEnv->pQuery; - + + bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus); int64_t prevNumOfRes = getNumOfResult(pRuntimeEnv); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport)); @@ -1360,8 +1420,8 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t SField dummyField = {0}; bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); - char *dataBlock = getDataBlocks(pRuntimeEnv, data, &sasArray[k], k, forwardStep, isDiskFileBlock); - + char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep); + SField *tpField = NULL; if (pFields != NULL) { @@ -1496,59 +1556,32 @@ static int32_t setGroupResultForKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, return TSDB_CODE_SUCCESS; } -static char *getGroupbyColumnData(SQueryRuntimeEnv *pRuntimeEnv, SField *pFields, SBlockInfo *pBlockInfo, char *data, - bool isDiskFileBlock, int16_t *type, int16_t *bytes) { - SQuery *pQuery = pRuntimeEnv->pQuery; +static char *getGroupbyColumnData(SQuery* pQuery, SData** data, int16_t* type, int16_t* bytes) { char * groupbyColumnData = NULL; - int32_t col = 0; - int16_t colIndexInBuf = 0; - SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr; for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { if (pGroupbyExpr->columnInfo[k].flag == TSDB_COL_TAG) { continue; } - + + int16_t colIndex = -1; int32_t colId = pGroupbyExpr->columnInfo[k].colId; - - if (isDiskFileBlock) { // get the required column data in file block according the column ID - for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) { - if (colId == pFields[i].colId) { - *type = pFields[i].type; - *bytes = pFields[i].bytes; - col = i; - break; - } - } - - // this column may not in current data block and also not in the required columns list - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - if (colId == pQuery->colList[i].data.colId) { - colIndexInBuf = i; - break; - } - } - } else { // get the required data column in cache - SColumn *pSchema = pRuntimeEnv->pMeterObj->schema; - - for (int32_t i = 0; i < pRuntimeEnv->pMeterObj->numOfColumns; ++i) { - if (colId == pSchema[i].colId) { - *type = pSchema[i].type; - *bytes = pSchema[i].bytes; - - col = i; - colIndexInBuf = i; - break; - } + + for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + if (pQuery->colList[i].data.colId == colId) { + colIndex = i; + break; } } - - int32_t columnIndex = isDiskFileBlock ? colIndexInBuf : col; - groupbyColumnData = - doGetDataBlocks(isDiskFileBlock, pRuntimeEnv, data, columnIndex, colId, *type, *bytes, colIndexInBuf); - + + assert(colIndex >= 0 && colIndex < pQuery->numOfCols); + + *type = pQuery->colList[colIndex].data.type; + *bytes = pQuery->colList[colIndex].data.bytes; + + groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].colIdxInBuf); break; } @@ -1608,9 +1641,12 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx } static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *forwardStep, TSKEY *primaryKeyCol, - char *data, SField *pFields, SBlockInfo *pBlockInfo, bool isDiskFileBlock) { + SField *pFields, SBlockInfo *pBlockInfo) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery * pQuery = pRuntimeEnv->pQuery; + + bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus); + SData **data = pRuntimeEnv->colDataBuffer; int64_t prevNumOfRes = 0; bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); @@ -1626,15 +1662,15 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * char *groupbyColumnData = NULL; if (groupbyStateValue) { - groupbyColumnData = getGroupbyColumnData(pRuntimeEnv, pFields, pBlockInfo, data, isDiskFileBlock, &type, &bytes); + groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes); } for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); - char *dataBlock = getDataBlocks(pRuntimeEnv, data, &sasArray[k], k, *forwardStep, isDiskFileBlock); - + char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep); + TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey; int64_t alignedTimestamp = taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); @@ -1646,15 +1682,11 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * // set the input column data for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; - int32_t colIdx = isDiskFileBlock ? pFilterInfo->info.colIdxInBuf : pFilterInfo->info.colIdx; - SColumnInfo * pColumnInfo = &pFilterInfo->info.data; - /* * NOTE: here the tbname/tags column cannot reach here, since it will never be a filter column, * so we do NOT check if is a tag or not */ - pFilterInfo->pData = doGetDataBlocks(isDiskFileBlock, pRuntimeEnv, data, colIdx, pColumnInfo->colId, - pColumnInfo->type, pColumnInfo->bytes, pFilterInfo->info.colIdxInBuf); + pFilterInfo->pData = doGetDataBlocks(pQuery, data, pFilterInfo->info.colIdxInBuf); } int32_t numOfRes = 0; @@ -1803,7 +1835,7 @@ static void validateQueryRangeAndData(SQueryRuntimeEnv *pRuntimeEnv, const TSKEY } static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, int64_t *pPrimaryColumn, - char *sdata, SField *pFields, __block_search_fn_t searchFn, int32_t *numOfRes) { + SField *pFields, __block_search_fn_t searchFn, int32_t *numOfRes) { int32_t forwardStep = 0; SQuery *pQuery = pRuntimeEnv->pQuery; @@ -1856,14 +1888,11 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo * pQuery->lastKey = pPrimaryColumn[pQuery->pos + (newForwardStep - 1) * step] + step; } - bool isFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus); - if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { *numOfRes = - rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, sdata, pFields, pBlockInfo, isFileBlock); + rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, pFields, pBlockInfo); } else { - *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pPrimaryColumn, sdata, pFields, pBlockInfo, - isFileBlock); + *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pPrimaryColumn, pFields, pBlockInfo); } assert(*numOfRes >= 0); @@ -1905,7 +1934,7 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, if (order == TSQL_SO_ASC) { int32_t i = 0; - int32_t step = 1; + int32_t step = QUERY_ASC_FORWARD_STEP; while (i pVnodeFiles->pFileInfo[i].fileID) { i += step; @@ -1919,7 +1948,7 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, } } else { int32_t i = numOfFiles - 1; - int32_t step = -1; + int32_t step = QUERY_DESC_FORWARD_STEP; while (i >= 0 && *fid < pVnodeFiles->pFileInfo[i].fileID) { i += step; @@ -1938,13 +1967,13 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter SQuery *pQuery = pRuntimeEnv->pQuery; pQuery->fileId += step; - int32_t fid = 0; + int32_t fileIndex = 0; int32_t order = (step == QUERY_ASC_FORWARD_STEP) ? TSQL_SO_ASC : TSQL_SO_DESC; while (1) { - fid = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, order); + fileIndex = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, order); // no files left, abort - if (fid < 0) { + if (fileIndex < 0) { if (step == QUERY_ASC_FORWARD_STEP) { dTrace("QInfo:%p no file to access, try data in cache", GET_QINFO_ADDR(pQuery)); } else { @@ -1956,9 +1985,8 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter break; } - // failed to mmap header file into memory will cause the retrieval of compblock info failed - if (vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fid) > 0) { + if (vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fileIndex) > 0) { break; } @@ -1971,15 +1999,15 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter pQuery->fileId += step; /* for backwards search, if the first file is not valid, abort */ - if (step < 0 && fid == 0) { + if (step < 0 && fileIndex == 0) { vnodeFreeFieldsEx(pRuntimeEnv); pQuery->fileId = -1; - fid = -1; + fileIndex = -1; break; } } - return fid; + return fileIndex; } void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimestamp, void *inputData, @@ -2165,60 +2193,12 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery // for loading block data in memory assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock); - - // To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system. - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - int32_t bytes = pQuery->colList[i].data.bytes; - pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes); - if (pRuntimeEnv->colDataBuffer[i] == NULL) { - goto _error_clean; - } - } - - // record the maximum column width among columns of this meter/metric - int32_t maxColWidth = pQuery->colList[0].data.bytes; - for (int32_t i = 1; i < pQuery->numOfCols; ++i) { - int32_t bytes = pQuery->colList[i].data.bytes; - if (bytes > maxColWidth) { - maxColWidth = bytes; - } - } - - pRuntimeEnv->primaryColBuffer = NULL; - if (PRIMARY_TSCOL_LOADED(pQuery)) { - pRuntimeEnv->primaryColBuffer = pRuntimeEnv->colDataBuffer[0]; - } else { - pRuntimeEnv->primaryColBuffer = - (SData *)malloc(pMeterObj->pointsPerFileBlock * TSDB_KEYSIZE + sizeof(SData) + EXTRA_BYTES); - } - - pRuntimeEnv->unzipBufSize = (size_t)(maxColWidth * pMeterObj->pointsPerFileBlock + EXTRA_BYTES); // plus extra_bytes - - pRuntimeEnv->unzipBuffer = (char *)malloc(pRuntimeEnv->unzipBufSize); - pRuntimeEnv->secondaryUnzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize); - - if (pRuntimeEnv->unzipBuffer == NULL || pRuntimeEnv->secondaryUnzipBuffer == NULL || - pRuntimeEnv->primaryColBuffer == NULL) { - goto _error_clean; - } - return TSDB_CODE_SUCCESS; _error_clean: tfree(pRuntimeEnv->resultInfo); tfree(pRuntimeEnv->pCtx); - for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) { - tfree(pRuntimeEnv->colDataBuffer[i]); - } - - tfree(pRuntimeEnv->unzipBuffer); - tfree(pRuntimeEnv->secondaryUnzipBuffer); - - if (!PRIMARY_TSCOL_LOADED(pQuery)) { - tfree(pRuntimeEnv->primaryColBuffer); - } - return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -2373,6 +2353,11 @@ bool isFirstLastRowQuery(SQuery *pQuery) { return false; } +bool notHasQueryTimeRange(SQuery *pQuery) { + return (pQuery->skey == 0 && pQuery->ekey == INT64_MAX && QUERY_IS_ASC_QUERY(pQuery)) || + (pQuery->skey == INT64_MAX && pQuery->ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery))); +} + bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; } bool needSupplementaryScan(SQuery *pQuery) { @@ -2447,11 +2432,13 @@ static int32_t getFirstCacheSlot(int32_t numOfBlocks, int32_t lastSlot, SCacheIn return (lastSlot - numOfBlocks + 1 + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks; } -static bool cacheBoundaryCheck(SQuery *pQuery, SMeterObj *pMeterObj) { +static bool cacheBoundaryCheck(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj *pMeterObj) { /* * here we get the first slot from the meter cache, not from the cache snapshot from pQuery, since the * snapshot value in pQuery may have been expired now. */ + SQuery* pQuery = pRuntimeEnv->pQuery; + SCacheInfo * pCacheInfo = (SCacheInfo *)pMeterObj->pCache; SCacheBlock *pBlock = NULL; @@ -2475,8 +2462,8 @@ static bool cacheBoundaryCheck(SQuery *pQuery, SMeterObj *pMeterObj) { * pBlock may be null value since this block is flushed to disk, and re-distributes to * other meter, so go on until we get the first not flushed cache block. */ - if ((pBlock = getCacheDataBlock(pMeterObj, pQuery, first)) != NULL) { - keyFirst = getTimestampInCacheBlock(pBlock, 0); + if ((pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, first)) != NULL) { + keyFirst = getTimestampInCacheBlock(pRuntimeEnv, pBlock, 0); break; } else { /* @@ -2519,7 +2506,6 @@ void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t v pQuery->currentSlot = lastSlot; pQuery->numOfBlocks = numOfBlocks; pQuery->firstSlot = getFirstCacheSlot(numOfBlocks, lastSlot, pCacheInfo); - ; /* * Note: the block id is continuous increasing, never becomes smaller. @@ -2553,7 +2539,7 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo assert((pQuery->lastKey >= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->lastKey <= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))); - if (!ignoreQueryRange && !cacheBoundaryCheck(pQuery, pMeterObj)) { + if (!ignoreQueryRange && !cacheBoundaryCheck(pRuntimeEnv, pMeterObj)) { return -1; } @@ -2568,8 +2554,13 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo /* locate the first point of which time stamp is no less than pQuery->skey */ __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; - SCacheBlock *pBlock = pCacheInfo->cacheBlocks[*slot]; - (*pos) = searchFn(pBlock->offset[0], pBlock->numOfPoints, pQuery->skey, pQuery->order.order); + pQuery->slot = *slot; + SCacheBlock* pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); + if (pBlock == NULL) { + return -1; + } + + (*pos) = searchFn(pRuntimeEnv->primaryColBuffer->data, pBlock->numOfPoints, pQuery->skey, pQuery->order.order); // restore skey before return pQuery->skey = rawskey; @@ -2579,7 +2570,7 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo return -1; } - int64_t nextKey = getTimestampInCacheBlock(pBlock, *pos); + int64_t nextKey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, *pos); if ((nextKey < pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) || (nextKey > pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { // all data are less than the pQuery->lastKey(pQuery->sKey) for asc query @@ -2631,7 +2622,7 @@ bool hasDataInCache(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj) { return false; } - return cacheBoundaryCheck(pQuery, pMeterObj); + return cacheBoundaryCheck(pRuntimeEnv, pMeterObj); } /** @@ -2699,7 +2690,7 @@ static void doGetAlignedIntervalQueryRange(SQuery *pQuery, TSKEY key, TSKEY skey pQuery->lastKey = pQuery->skey; } -static void getOneRowFromDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, int32_t pos) { +static void getOneRowFromDataBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, int32_t pos) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfCols; ++i) { @@ -2708,32 +2699,6 @@ static void getOneRowFromDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, in } } -static void getOneRowFromCacheBlock(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, SCacheBlock *pBlock, - char **dst, int32_t pos) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - /* - * in case of cache block expired, the pos may exceed the number of points in block, so check - * the range in the first place. - */ - if (pos > pBlock->numOfPoints) { - pos = pBlock->numOfPoints; - } - - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - int32_t colIdx = pQuery->colList[i].colIdx; - int32_t colId = pQuery->colList[i].data.colId; - - SColumn *pCols = &pMeterObj->schema[colIdx]; - - if (colIdx < 0 || colIdx >= pMeterObj->numOfColumns || pCols->colId != colId) { // set null - setNull(dst[i], pCols->type, pCols->bytes); - } else { - memcpy(dst[i], pBlock->offset[colIdx] + pos * pCols->bytes, pCols->bytes); - } - } -} - static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; @@ -2759,29 +2724,7 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet pQuery->slot, pQuery->pos); // save the point that is directly after or equals to the specified point - if (IS_DISK_DATA_BLOCK(pQuery)) { - getOneRowFromDiskBlock(pRuntimeEnv, pPointInterpSupporter->pNextPoint, pQuery->pos); - } else { - pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); - __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; - - while (pBlock == NULL) { - // cache block is flushed to disk, try to find new query position again - getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn); - - // new position is located in file, load data and abort - if (IS_DISK_DATA_BLOCK(pQuery)) { - getOneRowFromDiskBlock(pRuntimeEnv, pPointInterpSupporter->pNextPoint, pQuery->pos); - break; - } else { - pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); - } - } - - if (!IS_DISK_DATA_BLOCK(pQuery)) { - getOneRowFromCacheBlock(pRuntimeEnv, pMeterObj, pBlock, pPointInterpSupporter->pNextPoint, pQuery->pos); - } - } + getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pNextPoint, pQuery->pos); /* * 1. for last_row query, return immediately. @@ -2810,12 +2753,8 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet if (pQuery->pos > 0) { int32_t prevPos = pQuery->pos - 1; - if (IS_DISK_DATA_BLOCK(pQuery)) { - /* save the point that is directly after the specified point */ - getOneRowFromDiskBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, prevPos); - } else { - getOneRowFromCacheBlock(pRuntimeEnv, pMeterObj, pBlock, pPointInterpSupporter->pPrevPoint, prevPos); - } + /* save the point that is directly after the specified point */ + getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, prevPos); } else { __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; @@ -2825,7 +2764,8 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet moveToNextBlock(pRuntimeEnv, QUERY_DESC_FORWARD_STEP, searchFn, true); /* - * no previous data exists reset the status and load the data block that contains the qualified point + * no previous data exists. + * reset the status and load the data block that contains the qualified point */ if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%lld-%lld, out of range", @@ -2838,21 +2778,20 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet } else { // prev has been located if (pQuery->fileId >= 0) { pQuery->pos = pQuery->pBlock[pQuery->slot].numOfPoints - 1; - getOneRowFromDiskBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); + getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->slot, pQuery->pos, pQuery->pos); } else { - pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); - if (pBlock == NULL) { - // todo nothing, the previous block is flushed to disk - } else { - pQuery->pos = pBlock->numOfPoints - 1; - getOneRowFromCacheBlock(pRuntimeEnv, pMeterObj, pBlock, pPointInterpSupporter->pPrevPoint, pQuery->pos); + // moveToNextBlock make sure there is a available cache block, if exists + assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); + pBlock = &pRuntimeEnv->cacheBlock; + + pQuery->pos = pBlock->numOfPoints - 1; + getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); - qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), - pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos); - } + qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), + pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos); } } } @@ -2886,6 +2825,86 @@ static bool doGetQueryPos(TSKEY key, SMeterQuerySupportObj *pSupporter, SPointIn } } +static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter, + SPointInterpoSupporter *pPointInterpSupporter, SMeterObj *pMeterObj,TSKEY nextKey) { + SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; + + if (isFirstLastRowQuery(pQuery)) { + /* + * if the pQuery->skey != pQuery->ekey for last_row query, + * the query range is existed, so set them both the value of nextKey + */ + if (pQuery->skey != pQuery->ekey) { + assert(pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery) && nextKey >= pQuery->ekey && + nextKey <= pQuery->skey); + + pQuery->skey = nextKey; + pQuery->ekey = nextKey; + } + + return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); + } else { + getAlignedIntervalQueryRange(pQuery, nextKey, pQuery->skey, pQuery->ekey); + return true; + } +} + +//TODO refactor code, the best way to implement the last_row is utilizing the iterator +bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInterpoSupporter *pPointInterpSupporter) { + SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; + + SQuery * pQuery = pRuntimeEnv->pQuery; + SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; + + assert(!QUERY_IS_ASC_QUERY(pQuery) && notHasQueryTimeRange(pQuery)); + __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; + + TSKEY lastKey = -1; + + // todo copy data into temp buffer to avoid the buffer expired + pQuery->fileId = -1; + vnodeFreeFieldsEx(pRuntimeEnv); + + // keep in-memory cache status in local variables in case that it may be changed by write operation + getBasicCacheInfoSnapshot(pQuery, pMeterObj->pCache, pMeterObj->vnode); + + SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; + if (pCacheInfo != NULL && pCacheInfo->cacheBlocks != NULL && pQuery->numOfBlocks > 0) { + pQuery->fileId = -1; + TSKEY key = pMeterObj->lastKey; + + pQuery->skey = key; + pQuery->ekey = key; + pQuery->lastKey = pQuery->skey; + + // todo cache block may have been flushed to disk, and no data in cache anymore. + // So, copy cache block to local buffer is required. + lastKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); + } else { // no data in cache, try file + TSKEY key = pMeterObj->lastKeyOnFile; + + pQuery->skey = key; + pQuery->ekey = key; + pQuery->lastKey = pQuery->skey; + + bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn); + if (!ret) { // no data in file, return false; + return false; + } + + lastKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); + } + + assert(lastKey <= pQuery->skey); + + pQuery->skey = lastKey; + pQuery->ekey = lastKey; + pQuery->lastKey = pQuery->skey; + + return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); +} + /** * determine the first query range, according to raw query range [skey, ekey] and group-by interval. * the time interval for aggregating is not enforced to check its validation, the minimum interval is not less than @@ -2903,7 +2922,7 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup if (dataInDisk && getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) { TSKEY key = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); assert(key >= pQuery->skey); - + return doGetQueryPos(key, pSupporter, pPointInterpSupporter); } @@ -2918,30 +2937,26 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup TSKEY nextKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter); - - } else { // descending order + + } else { // descending order if (dataInCache) { // todo handle error TSKEY nextKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); assert(nextKey == -1 || nextKey <= pQuery->skey); - // valid data in cache - if (nextKey != -1) { + if (nextKey != -1) { // find qualified data in cache if (nextKey >= pQuery->ekey) { - if (isFirstLastRowQuery(pQuery)) { - return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); - } else { - getAlignedIntervalQueryRange(pQuery, nextKey, pQuery->skey, pQuery->ekey); - return true; - } + return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, nextKey); } else { /* * nextKey < pQuery->ekey && nextKey < pQuery->lastKey, query range is - * larger than all data, abort NOTE: Interp query does not reach here, since for all interp query, + * larger than all data, abort + * + * NOTE: Interp query does not reach here, since for all interp query, * the query order is ascending order. */ return false; } - } else { // all data in cache are greater than pQuery->lastKey, try file + } else { // all data in cache are greater than pQuery->skey, try file } } @@ -2949,19 +2964,11 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup TSKEY key = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); assert(key <= pQuery->skey); - /* key in query range. If not, no qualified in disk file */ + // key in query range. If not, no qualified in disk file if (key >= pQuery->ekey) { - if (isFirstLastRowQuery(pQuery)) { /* no qualified data in this query range */ - return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); - } else { - getAlignedIntervalQueryRange(pQuery, key, pQuery->skey, pQuery->ekey); - return true; - } - } else { // Goes on in case of key in file less than pMeterObj->lastKey, - // which is also the pQuery->skey - if (isFirstLastRowQuery(pQuery)) { - return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); - } + return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, key); + } else { //In case of all queries, the value of false will be returned if key < pQuery->ekey + return false; } } } @@ -2987,9 +2994,9 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p return -1; } - SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); + SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); if (pBlock != NULL) { - nextTimestamp = getTimestampInCacheBlock(pBlock, position->pos); + nextTimestamp = getTimestampInCacheBlock(pRuntimeEnv, pBlock, position->pos); } else { // todo fix it } @@ -3172,12 +3179,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInf } // update the pQuery->limit.offset value, and pQuery->pos value - TSKEY *keys = NULL; - if (IS_DISK_DATA_BLOCK(pQuery)) { - keys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; - } else { - keys = (TSKEY *)(((SCacheBlock *)pBlock)->offset[0]); - } + TSKEY *keys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; int32_t i = 0; if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -3218,7 +3220,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInf if (IS_DISK_DATA_BLOCK(pQuery)) { pQuery->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); } else { - pQuery->skey = getTimestampInCacheBlock(pBlock, pQuery->pos); + pQuery->skey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pQuery->pos); } // update the offset value @@ -3252,16 +3254,22 @@ static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) { // in case of point-interpolation query, use asc order scan - char msg[] = - "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, " + const char* msg = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, " "new qrange:%lld-%lld"; - // descending order query + // descending order query for last_row query if (isFirstLastRowQuery(pQuery)) { dTrace("QInfo:%p scan order changed for last_row query, old:%d, new:%d", GET_QINFO_ADDR(pQuery), pQuery->order.order, TSQL_SO_DESC); pQuery->order.order = TSQL_SO_DESC; + + int64_t skey = MIN(pQuery->skey, pQuery->ekey); + int64_t ekey = MAX(pQuery->skey, pQuery->ekey); + + pQuery->skey = ekey; + pQuery->ekey = skey; + return; } @@ -3334,11 +3342,11 @@ static int32_t doSkipDataBlock(SQueryRuntimeEnv *pRuntimeEnv) { break; } - void *pBlock = getGenericDataBlock(pMeterObj, pQuery, pQuery->slot); + void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); assert(pBlock != NULL); int32_t blockType = IS_DISK_DATA_BLOCK(pQuery) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; - SBlockInfo blockInfo = getBlockBasicInfo(pBlock, blockType); + SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, blockType); int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1; assert(maxReads >= 0); @@ -3367,10 +3375,10 @@ void forwardQueryStartPosition(SQueryRuntimeEnv *pRuntimeEnv) { return; } - void *pBlock = getGenericDataBlock(pMeterObj, pQuery, pQuery->slot); + void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; - SBlockInfo blockInfo = getBlockBasicInfo(pBlock, blockType); + SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, blockType); // get the qualified data that can be skipped int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1; @@ -3560,7 +3568,6 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI pCtx->numOfParams = 4; SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf; - pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail)); SInterpInfoDetail *pInterpDetail = pInterpInfo->pInterpDetail; @@ -3712,10 +3719,70 @@ static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQue return TSDB_CODE_SUCCESS; } +static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* pMeterObj) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + // To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system. + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { + int32_t bytes = pQuery->colList[i].data.bytes; + pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes); + if (pRuntimeEnv->colDataBuffer[i] == NULL) { + goto _error_clean; + } + } + + // record the maximum column width among columns of this meter/metric + int32_t maxColWidth = pQuery->colList[0].data.bytes; + for (int32_t i = 1; i < pQuery->numOfCols; ++i) { + int32_t bytes = pQuery->colList[i].data.bytes; + if (bytes > maxColWidth) { + maxColWidth = bytes; + } + } + + pRuntimeEnv->primaryColBuffer = NULL; + if (PRIMARY_TSCOL_LOADED(pQuery)) { + pRuntimeEnv->primaryColBuffer = pRuntimeEnv->colDataBuffer[0]; + } else { + pRuntimeEnv->primaryColBuffer = + (SData *) malloc(pMeterObj->pointsPerFileBlock * TSDB_KEYSIZE + sizeof(SData) + EXTRA_BYTES); + } + + pRuntimeEnv->unzipBufSize = (size_t)(maxColWidth * pMeterObj->pointsPerFileBlock + EXTRA_BYTES); // plus extra_bytes + + pRuntimeEnv->unzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize); + pRuntimeEnv->secondaryUnzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize); + + if (pRuntimeEnv->unzipBuffer == NULL || pRuntimeEnv->secondaryUnzipBuffer == NULL || + pRuntimeEnv->primaryColBuffer == NULL) { + goto _error_clean; + } + + return TSDB_CODE_SUCCESS; + + _error_clean: + for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) { + tfree(pRuntimeEnv->colDataBuffer[i]); + } + + tfree(pRuntimeEnv->unzipBuffer); + tfree(pRuntimeEnv->secondaryUnzipBuffer); + + if (!PRIMARY_TSCOL_LOADED(pQuery)) { + tfree(pRuntimeEnv->primaryColBuffer); + } + + return TSDB_CODE_SERV_OUT_OF_MEMORY; +} + int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMeterQuerySupportObj *pSupporter, void *param) { SQuery *pQuery = &pQInfo->query; - + int32_t code = TSDB_CODE_SUCCESS; + + /* + * only the successful complete requries the sem_post/over = 1 operations. + */ if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->skey > pQuery->ekey)) || (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->ekey > pQuery->skey))) { dTrace("QInfo:%p no result in time range %lld-%lld, order %d", pQInfo, pQuery->skey, pQuery->ekey, @@ -3723,7 +3790,6 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete sem_post(&pQInfo->dataReady); pQInfo->over = 1; - return TSDB_CODE_SUCCESS; } @@ -3748,6 +3814,11 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv; pRuntimeEnv->pQuery = pQuery; + pRuntimeEnv->pMeterObj = pMeterObj; + + if ((code = allocateRuntimeEnvBuf(pRuntimeEnv, pMeterObj)) != TSDB_CODE_SUCCESS) { + return code; + } vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, &dataInDisk, &dataInCache); @@ -3756,8 +3827,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete dTrace("QInfo:%p no result in query", pQInfo); sem_post(&pQInfo->dataReady); pQInfo->over = 1; - - return TSDB_CODE_SUCCESS; + return code; } pRuntimeEnv->pTSBuf = param; @@ -3768,16 +3838,16 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete } // create runtime environment - int32_t ret = setupQueryRuntimeEnv(pMeterObj, pQuery, &pSupporter->runtimeEnv, NULL, pQuery->order.order, false); - if (ret != TSDB_CODE_SUCCESS) { - return ret; + code = setupQueryRuntimeEnv(pMeterObj, pQuery, &pSupporter->runtimeEnv, NULL, pQuery->order.order, false); + if (code != TSDB_CODE_SUCCESS) { + return code; } vnodeRecordAllFiles(pQInfo, pMeterObj->vnode); if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - if ((ret = allocateOutputBufForGroup(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) { - return ret; + if ((code = allocateOutputBufForGroup(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) { + return code; } int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); @@ -3788,34 +3858,42 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete pRuntimeEnv->usedIndex = 0; pRuntimeEnv->pResult = pSupporter->pResult; } - - // in case of last_row query, we set the query timestamp to pMeterObj->lastKey; - if (isFirstLastRowQuery(pQuery)) { - pQuery->skey = pMeterObj->lastKey; - pQuery->ekey = pMeterObj->lastKey; - pQuery->lastKey = pQuery->skey; - } - + pSupporter->rawSKey = pQuery->skey; pSupporter->rawEKey = pQuery->ekey; - + /* query on single table */ pSupporter->numOfMeters = 1; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - + SPointInterpoSupporter interpInfo = {0}; pointInterpSupporterInit(pQuery, &interpInfo); - - if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo) == false) || - (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || - (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { - sem_post(&pQInfo->dataReady); - pQInfo->over = 1; - - pointInterpSupporterDestroy(&interpInfo); - return TSDB_CODE_SUCCESS; + + /* + * in case of last_row query without query range, we set the query timestamp to + * pMeterObj->lastKey. Otherwise, keep the initial query time range unchanged. + */ + + if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) { + if (!normalizeUnBoundLastRowQuery(pSupporter, &interpInfo)) { + sem_post(&pQInfo->dataReady); + pQInfo->over = 1; + + pointInterpSupporterDestroy(&interpInfo); + return TSDB_CODE_SUCCESS; + } + } else { + if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo) == false) || + (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || + (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { + sem_post(&pQInfo->dataReady); + pQInfo->over = 1; + + pointInterpSupporterDestroy(&interpInfo); + return TSDB_CODE_SUCCESS; + } } - + /* * here we set the value for before and after the specified time into the * parameter for interpolation query @@ -3949,12 +4027,17 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSQL_SO_ASC : TSQL_SO_DESC; tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); } - + int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pSupporter->runtimeEnv, pTagSchema, TSQL_SO_ASC, true); if (ret != TSDB_CODE_SUCCESS) { return ret; } - + + ret = allocateRuntimeEnvBuf(pRuntimeEnv, pMeter); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + tSidSetSort(pSupporter->pSidSet); vnodeRecordAllFiles(pQInfo, pMeter->vnode); @@ -4057,13 +4140,12 @@ void UNUSED_FUNC truncateResultByLimit(SQInfo *pQInfo, int64_t *final, int32_t * } } -TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index) { +TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv* pRuntimeEnv, SCacheBlock *pBlock, int32_t index) { if (pBlock == NULL || index >= pBlock->numOfPoints || index < 0) { return -1; } - - TSKEY *ts = (TSKEY *)pBlock->offset[0]; - return ts[index]; + + return ((TSKEY*)(pRuntimeEnv->primaryColBuffer->data))[index]; } /* @@ -4080,20 +4162,21 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot); // this block must be loaded into buffer - SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; + SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; assert(pQuery->pos >= 0 && pQuery->pos < pBlock->numOfPoints); SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; - bool loadTimestamp = true; - int32_t fileId = pQuery->fileId; - int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&fileId, pRuntimeEnv, pQuery->order.order); + int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order); dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot); - int32_t ret = - loadDataBlockIntoMem(pBlock, &pQuery->pFields[pQuery->slot], pRuntimeEnv, fileIndex, loadTimestamp, true); + bool loadTS = true; + bool loadFields = true; + int32_t slot = pQuery->slot; + + int32_t ret = loadDataBlockIntoMem(pBlock, &pQuery->pFields[slot], pRuntimeEnv, fileIndex, loadTS, loadFields); if (ret != TSDB_CODE_SUCCESS) { return -1; } @@ -4135,7 +4218,7 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); dTrace( - "QInfo:%p vid:%d sid:%d id:%s cache block re-allocated to other meter, " + "QInfo:%p vid:%d sid:%d id:%s cache block is assigned to other meter, " "try get query start position in file/cache, qrange:%lld-%lld, lastKey:%lld", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery->lastKey); @@ -4146,7 +4229,7 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear */ bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn); - dTrace("QInfo:%p vid:%d sid:%d id:%s find the possible position, fileId:%d, slot:%d, pos:%d", pQInfo, + dTrace("QInfo:%p vid:%d sid:%d id:%s find the possible position in file, fileId:%d, slot:%d, pos:%d", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot, pQuery->pos); if (ret) { @@ -4187,8 +4270,8 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn) { SQuery * pQuery = pRuntimeEnv->pQuery; SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; + SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; - assert(pQuery->fileId < 0); /* @@ -4208,6 +4291,7 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste int32_t currentSlot = pCacheInfo->currentSlot; int32_t firstSlot = getFirstCacheSlot(numOfBlocks, currentSlot, pCacheInfo); + if (step == QUERY_DESC_FORWARD_STEP && pQuery->slot == firstSlot) { bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn); if (ret) { @@ -4220,7 +4304,6 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste // the skip operation does NOT set the startPos yet // assert(pRuntimeEnv->startPos.fileId < 0); - } else { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); } @@ -4229,7 +4312,7 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste /* now still iterate the cache data blocks */ pQuery->slot = (pQuery->slot + step + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks; - SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); + SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); /* * data in this cache block has been flushed to disk, then we should locate the start position in file. @@ -4242,7 +4325,7 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste } else { pQuery->pos = (QUERY_IS_ASC_QUERY(pQuery)) ? 0 : pBlock->numOfPoints - 1; - TSKEY startkey = getTimestampInCacheBlock(pBlock, pQuery->pos); + TSKEY startkey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pQuery->pos); if (startkey < 0) { setQueryStatus(pQuery, QUERY_COMPLETED); } @@ -4277,22 +4360,25 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl (step == QUERY_DESC_FORWARD_STEP && (pQuery->slot == 0))) { fileIndex = getNextDataFileCompInfo(pRuntimeEnv, pMeterObj, step); /* data maybe in cache */ - if (fileIndex < 0) { + + if (fileIndex >= 0) { // next file + pQuery->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->numOfBlocks - 1; + pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1; + } else { // try data in cache assert(pQuery->fileId == -1); + if (step == QUERY_ASC_FORWARD_STEP) { getFirstDataBlockInCache(pRuntimeEnv); - } else { /* no data any more */ + } else { // no data to check for desc order query setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); } return DISK_DATA_LOADED; - } else { - pQuery->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->numOfBlocks - 1; - pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1; } } else { // next block in the same file int32_t fid = pQuery->fileId; fileIndex = vnodeGetVnodeHeaderFileIdx(&fid, pRuntimeEnv, pQuery->order.order); + pQuery->slot += step; pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1; } @@ -4304,14 +4390,11 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl return DISK_DATA_LOADED; } + // load data block function will change the value of pQuery->pos int32_t ret = LoadDatablockOnDemand(&pQuery->pBlock[pQuery->slot], &pQuery->pFields[pQuery->slot], &pRuntimeEnv->blockStatus, pRuntimeEnv, fileIndex, pQuery->slot, searchFn, true); if (ret != DISK_DATA_LOADED) { - /* - * if it is the last block of file, set current access position at the last point of the meter in this file, - * in order to get the correct next access point, - */ return ret; } } else { // data in cache @@ -4321,62 +4404,33 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl return DISK_DATA_LOADED; } -static void doHandleFileBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pblockInfo, __block_search_fn_t searchFn, - SData **sdata, int32_t *numOfRes, int32_t blockLoadStatus, int32_t *forwardStep) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SQueryCostSummary *pSummary = &pRuntimeEnv->summary; - - int64_t start = taosGetTimestampUs(); - - SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot); - *pblockInfo = getBlockBasicInfo(pBlock, BLK_FILE_BLOCK); - - TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; - - if (blockLoadStatus == DISK_DATA_LOADED) { - *forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, (char *)sdata, - pQuery->pFields[pQuery->slot], searchFn, numOfRes); - } else { - *forwardStep = pblockInfo->size; - } - - pSummary->fileTimeUs += (taosGetTimestampUs() - start); -} - -static void doHandleCacheBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pblockInfo, __block_search_fn_t searchFn, - int32_t *numOfRes, int32_t *forwardStep) { +static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pblockInfo, __block_search_fn_t searchFn, + int32_t *numOfRes, int32_t blockLoadStatus, int32_t *forwardStep) { SQuery * pQuery = pRuntimeEnv->pQuery; - SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; SQueryCostSummary *pSummary = &pRuntimeEnv->summary; + TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; int64_t start = taosGetTimestampUs(); - // todo refactor getCacheDataBlock. - //#ifdef _CACHE_INVALID_TEST - // taosMsleep(20000); - //#endif - SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); - while (pBlock == NULL) { - getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn); - - if (IS_DISK_DATA_BLOCK(pQuery)) { // do check data block in file - break; + if (IS_DISK_DATA_BLOCK(pQuery)) { + SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot); + *pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK); + + if (blockLoadStatus == DISK_DATA_LOADED) { + *forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot], searchFn, + numOfRes); } else { - pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); + *forwardStep = pblockInfo->size; } - } - - if (IS_DISK_DATA_BLOCK(pQuery)) { - // start query position is located in file, try query on file block - doHandleFileBlockImpl(pRuntimeEnv, pblockInfo, searchFn, pRuntimeEnv->colDataBuffer, numOfRes, DISK_DATA_LOADED, - forwardStep); - } else { // also query in cache block - *pblockInfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK); - TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0]; + pSummary->fileTimeUs += (taosGetTimestampUs() - start); + } else { + SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot); + *pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK); + *forwardStep = - applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, (char *)pBlock, NULL, searchFn, numOfRes); - + applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes); + pSummary->cacheTimeUs += (taosGetTimestampUs() - start); } } @@ -4389,7 +4443,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int64_t cnt = 0; SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; - SData ** sdata = pRuntimeEnv->colDataBuffer; __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; int32_t blockLoadStatus = DISK_DATA_LOADED; @@ -4414,12 +4467,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t numOfRes = 0; SBlockInfo blockInfo = {0}; - - if (IS_DISK_DATA_BLOCK(pQuery)) { - doHandleFileBlockImpl(pRuntimeEnv, &blockInfo, searchFn, sdata, &numOfRes, blockLoadStatus, &forwardStep); - } else { - doHandleCacheBlockImpl(pRuntimeEnv, &blockInfo, searchFn, &numOfRes, &forwardStep); - } + doHandleDataBlockImpl(pRuntimeEnv, &blockInfo, searchFn, &numOfRes, blockLoadStatus, &forwardStep); dTrace("QInfo:%p check data block, brange:%lld-%lld, fileId:%d, slot:%d, pos:%d, bstatus:%d, rows:%d, checked:%d", GET_QINFO_ADDR(pQuery), blockInfo.keyFirst, blockInfo.keyLast, pQuery->fileId, pQuery->slot, pQuery->pos, @@ -4470,10 +4518,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } // check next block - void *pNextBlock = getGenericDataBlock(pMeterObj, pQuery, pQuery->slot); + void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; - blockInfo = getBlockBasicInfo(pNextBlock, blockType); + blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); if (!checkQueryRangeAgainstNextBlock(&blockInfo, pRuntimeEnv)) { break; } @@ -4484,7 +4532,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { static void updatelastkey(SQuery *pQuery, SMeterQueryInfo *pMeterQInfo) { pMeterQInfo->lastKey = pQuery->lastKey; } -void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32_t blockStatus, char *data, +void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32_t blockStatus, SBlockInfo *pBlockBasicInfo, SMeterDataInfo *pDataHeadInfoEx, SField *pFields, __block_search_fn_t searchFn) { /* cache blocks may be assign to other meter, abort */ @@ -4497,7 +4545,7 @@ void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32 if (pQuery->nAggTimeInterval == 0) { // not interval query int32_t numOfRes = 0; - applyFunctionsOnBlock(pRuntimeEnv, pBlockBasicInfo, primaryKeys, data, pFields, searchFn, &numOfRes); + applyFunctionsOnBlock(pRuntimeEnv, pBlockBasicInfo, primaryKeys, pFields, searchFn, &numOfRes); // note: only fixed number of output for each group by operation if (numOfRes > 0) { @@ -4511,7 +4559,7 @@ void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32 } } else { - applyIntervalQueryOnBlock(pSupporter, pDataHeadInfoEx, data, primaryKeys, pBlockBasicInfo, blockStatus, pFields, + applyIntervalQueryOnBlock(pSupporter, pDataHeadInfoEx, primaryKeys, pBlockBasicInfo, blockStatus, pFields, searchFn); } } @@ -6433,7 +6481,7 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int3 } static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo, - SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, char *sdata, SField *pFields, + SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, __block_search_fn_t searchFn) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -6444,7 +6492,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete while (1) { int32_t numOfRes = 0; - int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, sdata, pFields, searchFn, &numOfRes); + int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes); assert(steps > 0); // NOTE: in case of stable query, only ONE(or ZERO) row of result generated for each query range @@ -6769,7 +6817,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk } } - SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_FILE_BLOCK); + SBlockInfo binfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK); bool loadTS = needPrimaryTimestampCol(pQuery, &binfo); /* @@ -7013,9 +7061,9 @@ static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY keyInData, TSKEY doGetAlignedIntervalQueryRange(pQuery, keyInData, skey, ekey); } -static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, - int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, - SField *pFields, __block_search_fn_t searchFn) { +static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int64_t *pPrimaryData, + SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields, + __block_search_fn_t searchFn) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; SMeterQueryInfo * pInfo = pInfoEx->pMeterQInfo; @@ -7036,7 +7084,7 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD ((pBlockInfo->keyFirst > pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) { int32_t numOfRes = 0; /* current block is included in this interval */ - int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryData, data, pFields, searchFn, &numOfRes); + int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryData, pFields, searchFn, &numOfRes); assert(numOfRes <= 1 && numOfRes >= 0 && steps > 0); if (pInfo->lastResRows == 0) { @@ -7047,7 +7095,7 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD saveIntervalQueryRange(pRuntimeEnv, pInfo); } else { - doApplyIntervalQueryOnBlock(pSupporter, pInfo, pBlockInfo, pPrimaryData, data, pFields, searchFn); + doApplyIntervalQueryOnBlock(pSupporter, pInfo, pBlockInfo, pPrimaryData, pFields, searchFn); } } diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index da17b0dd18..e9635c4ee6 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -146,9 +146,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe if (pQuery->nAggTimeInterval == 0) { if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - dTrace( - "QInfo:%p vid:%d sid:%d id:%s, query completed, no need to scan data in cache. qrange:%lld-%lld, " - "lastKey:%lld", + dTrace("QInfo:%p vid:%d sid:%d id:%s, query completed, ignore data in cache. qrange:%lld-%lld, lastKey:%lld", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery->lastKey); @@ -183,7 +181,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe // data in this block may be flushed to disk and this block is allocated to other meter // todo try with remain cache blocks - SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); + SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); if (pBlock == NULL) { continue; } @@ -196,7 +194,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; for (int32_t i = 0; i < pCacheInfo->maxBlocks; ++i) { - pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); + pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); /* * 1. pBlock == NULL. The cache block may be flushed to disk, so it is not available, skip and try next @@ -216,8 +214,8 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe setStartPositionForCacheBlock(pQuery, pBlock, &firstCheckSlot); - TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0]; - + TSKEY* primaryKeys = (TSKEY*) pRuntimeEnv->primaryColBuffer->data; + // in handling file data block, the timestamp range validation is done during fetching candidate file blocks if ((primaryKeys[pQuery->pos] > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || (primaryKeys[pQuery->pos] < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) { @@ -226,15 +224,14 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe // only record the key on last block SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus); - SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK); + SBlockInfo binfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK); dTrace("QInfo:%p check data block, brange:%lld-%lld, fileId:%d, slot:%d, pos:%d, bstatus:%d", GET_QINFO_ADDR(pQuery), binfo.keyFirst, binfo.keyLast, pQuery->fileId, pQuery->slot, pQuery->pos, pRuntimeEnv->blockStatus); totalBlocks++; - queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pBlock, &binfo, &pMeterInfo[k], NULL, - searchFn); + queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, &binfo, &pMeterInfo[k], NULL, searchFn); if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) { break; @@ -425,7 +422,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe continue; } - SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_FILE_BLOCK); + SBlockInfo binfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK); assert(pQuery->pos >= 0 && pQuery->pos < pBlock->numOfPoints); TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; @@ -441,8 +438,8 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe (pBlock->keyFirst >= pQuery->ekey && pBlock->keyLast <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); } - queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pRuntimeEnv->colDataBuffer, &binfo, - pOneMeterDataInfo, pInfoEx->pBlock.fields, searchFn); + queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, &binfo, pOneMeterDataInfo, pInfoEx->pBlock.fields, + searchFn); } tfree(pReqMeterDataInfo); @@ -489,6 +486,9 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool * pQInfo->pObj = pMeterObj; pQuery->lastKey = pQuery->skey; pRuntimeEnv->pMeterObj = pMeterObj; + + vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj); + vnodeUpdateFilterColumnIndex(pQuery); vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, dataInDisk, dataInCache); @@ -619,6 +619,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pSupporter->rawEKey = key; int64_t num = doCheckMetersInGroup(pQInfo, index, start); + if (num == 0) { + int32_t k = 1; + } assert(num >= 0); } else { dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, pOneMeter->vnode, @@ -686,7 +689,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } - + bool dataInDisk = true; bool dataInCache = true; if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, k, 0)) { @@ -725,9 +728,6 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { } } - vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj); - vnodeUpdateFilterColumnIndex(pQuery); - vnodeScanAllData(pRuntimeEnv); pQuery->pointsRead = getNumOfResult(pRuntimeEnv); -- GitLab