提交 ca136d47 编写于 作者: H hjxilinx

add the support of issue #1131. [tbase-901]

上级 33367d70
......@@ -1307,7 +1307,6 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
tscTrace("continue parse sql: %s", pSql->asyncTblPos);
}
if (tscIsInsertOrImportData(pSql->sqlstr)) {
/*
* only for async multi-vnode insertion
......
......@@ -829,7 +829,7 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd* pCmd) {
char* z = NULL;
if (len > 0) {
z = strstr (pCmd->payload, "invalid sql");
z = strstr (pCmd->payload, "invalid SQL");
}
return z != NULL;
......
......@@ -129,6 +129,7 @@ bool isPointInterpoQuery(SQuery* pQuery);
bool isTopBottomQuery(SQuery* pQuery);
bool isFirstLastRowQuery(SQuery* pQuery);
bool isTSCompQuery(SQuery* pQuery);
bool notHasQueryTimeRange(SQuery *pQuery);
bool needSupplementaryScan(SQuery* pQuery);
bool onDemandLoadDatablock(SQuery* pQuery, int16_t queryRangeSet);
......@@ -172,10 +173,10 @@ void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj* pSupporter);
void copyFromGroupBuf(SQInfo* pQInfo, SOutputRes* result);
SBlockInfo getBlockBasicInfo(void* pBlock, int32_t blockType);
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQuery* pQuery, int32_t slot);
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType);
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot);
void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32_t blockStatus, char* data,
void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32_t blockStatus,
SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields,
__block_search_fn_t searchFn);
......
......@@ -35,19 +35,19 @@ typedef struct {
int32_t fileId;
} SPositionInfo;
typedef struct SQueryLoadBlockInfo {
typedef struct SLoadDataBlockInfo {
int32_t fileListIndex; /* index of this file in files list of this vnode */
int32_t fileId;
int32_t slotIdx;
int32_t sid;
bool tsLoaded; // if timestamp column of current block is loaded or not
} SQueryLoadBlockInfo;
} SLoadDataBlockInfo;
typedef struct SQueryLoadCompBlockInfo {
typedef struct SLoadCompBlockInfo {
int32_t sid; /* meter sid */
int32_t fileId;
int32_t fileListIndex;
} SQueryLoadCompBlockInfo;
} SLoadCompBlockInfo;
/*
* the header file info for one vnode
......@@ -126,20 +126,28 @@ typedef struct RuntimeEnvironment {
SQuery* pQuery;
SMeterObj* pMeterObj;
SQLFunctionCtx* pCtx;
SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
SQueryFilesInfo vnodeFileInfo;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult
void* hashList;
int32_t usedIndex; // assigned SOutputRes in list
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
SLoadDataBlockInfo loadBlockInfo; /* record current block load information */
SLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
SQueryFilesInfo vnodeFileInfo;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult
void* hashList;
int32_t usedIndex; // assigned SOutputRes in list
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
/*
* Temporarily hold the in-memory cache block info during scan cache blocks
* Here we do not use the cacheblock info from pMeterObj, simple because it may change anytime
* during the query by the subumit/insert handling threads.
* So we keep a copy of the support structure as well as the cache block data itself.
*/
SCacheBlock cacheBlock;
} SQueryRuntimeEnv;
/* intermediate result during multimeter query involves interval */
......
......@@ -54,14 +54,14 @@ enum {
static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset,
int32_t size);
static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo);
static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo);
static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn,
bool loadData);
static int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery *pQuery,
SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterHeadDataInfo,
int32_t start, int32_t end);
static TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index);
static TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv *pRuntimeEnv, SCacheBlock *pBlock, int32_t index);
static TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index);
static void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, int32_t pos);
......@@ -71,11 +71,11 @@ static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pRes
static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY keyInData, TSKEY skey, TSKEY ekey);
static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo,
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, char *sdata, SField *pFields,
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
__block_search_fn_t searchFn);
static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult);
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data,
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx,
int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus,
SField *pFields, __block_search_fn_t searchFn);
......@@ -221,7 +221,7 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
SQuery *pQuery = pRuntimeEnv->pQuery;
// check if data file header of this table has been loaded into memory, avoid to reloaded comp Block info
SQueryLoadCompBlockInfo *pLoadCompBlockInfo = &pRuntimeEnv->loadCompBlockInfo;
SLoadCompBlockInfo *pLoadCompBlockInfo = &pRuntimeEnv->loadCompBlockInfo;
// if vnodeFreeFields is called, the pQuery->pFields is NULL
if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid &&
......@@ -235,14 +235,14 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
}
static void vnodeSetCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex, int32_t sid) {
SQueryLoadCompBlockInfo *pLoadCompBlockInfo = &pRuntimeEnv->loadCompBlockInfo;
pLoadCompBlockInfo->sid = sid;
pLoadCompBlockInfo->fileListIndex = fileIndex;
pLoadCompBlockInfo->fileId = pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID;
SLoadCompBlockInfo *pCompBlockLoadInfo = &pRuntimeEnv->loadCompBlockInfo;
pCompBlockLoadInfo->sid = sid;
pCompBlockLoadInfo->fileListIndex = fileIndex;
pCompBlockLoadInfo->fileId = pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID;
}
static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo) {
static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
pCompBlockLoadInfo->sid = -1;
pCompBlockLoadInfo->fileId = -1;
pCompBlockLoadInfo->fileListIndex = -1;
......@@ -251,13 +251,11 @@ static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadIn
static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex,
bool loadPrimaryTS) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
/* this block has been loaded into memory, return directly */
if (pLoadInfo->fileId == pQuery->fileId && pLoadInfo->slotIdx == pQuery->slot && pQuery->slot != -1 &&
pLoadInfo->sid == pMeterObj->sid) {
assert(fileIndex == pLoadInfo->fileListIndex);
pLoadInfo->sid == pMeterObj->sid && pLoadInfo->fileListIndex == fileIndex) {
// previous load operation does not load the primary timestamp column, we only need to load the timestamp column
if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadPrimaryTS) {
return DISK_BLOCK_LOAD_TS;
......@@ -272,7 +270,7 @@ static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *
static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex,
bool tsLoaded) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
pLoadInfo->fileId = pQuery->fileId;
pLoadInfo->slotIdx = pQuery->slot;
......@@ -281,7 +279,7 @@ static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
pLoadInfo->tsLoaded = tsLoaded;
}
static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
static void vnodeInitDataBlockInfo(SLoadDataBlockInfo *pBlockLoadInfo) {
pBlockLoadInfo->slotIdx = -1;
pBlockLoadInfo->fileId = -1;
pBlockLoadInfo->sid = -1;
......@@ -990,7 +988,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
}
// todo ignore the blockType, pass the pQuery into this function
SBlockInfo getBlockBasicInfo(void *pBlock, int32_t blockType) {
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void *pBlock, int32_t blockType) {
SBlockInfo blockInfo = {0};
if (IS_FILE_BLOCK(blockType)) {
SCompBlock *pDiskBlock = (SCompBlock *)pBlock;
......@@ -1002,8 +1000,8 @@ SBlockInfo getBlockBasicInfo(void *pBlock, int32_t blockType) {
} else {
SCacheBlock *pCacheBlock = (SCacheBlock *)pBlock;
blockInfo.keyFirst = getTimestampInCacheBlock(pCacheBlock, 0);
blockInfo.keyLast = getTimestampInCacheBlock(pCacheBlock, pCacheBlock->numOfPoints - 1);
blockInfo.keyFirst = getTimestampInCacheBlock(pRuntimeEnv, pCacheBlock, 0);
blockInfo.keyLast = getTimestampInCacheBlock(pRuntimeEnv, pCacheBlock, pCacheBlock->numOfPoints - 1);
blockInfo.size = pCacheBlock->numOfPoints;
blockInfo.numOfCols = pCacheBlock->pMeterObj->numOfColumns;
}
......@@ -1073,14 +1071,37 @@ void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, in
position->pos = pos;
}
static FORCE_INLINE void saveNextAccessPositionInCache(SPositionInfo *position, int32_t slotIdx, int32_t pos) {
savePointPosition(position, -1, slotIdx, pos);
bool isCacheBlockValid(SQuery* pQuery, SCacheBlock* pBlock, SMeterObj* pMeterObj) {
if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) {
SMeterObj* pNewMeterObj = pBlock->pMeterObj;
char* id = (pNewMeterObj != NULL)? pNewMeterObj->meterId:NULL;
dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, "
"blockMeterObj:%p, blockMeter id:%s, first:%d, last:%d, numOfBlocks:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId,
pQuery->blockId, pMeterObj, pNewMeterObj, id, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks);
return false;
}
if (pBlock->numOfPoints == 0) {
dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is empty. slot:%d first:%d, last:%d, numOfBlocks:%d,"
"allocated but not write data yet.", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pQuery->slot, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks);
return false;
}
return true;
}
// todo all functions that call this function should check the returned data blocks status
SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slot) {
SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
if (pCacheInfo == NULL || pCacheInfo->cacheBlocks == NULL || slot < 0) {
if (pCacheInfo == NULL || pCacheInfo->cacheBlocks == NULL || slot < 0 || slot >= pCacheInfo->maxBlocks) {
return NULL;
}
......@@ -1088,21 +1109,88 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slo
SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot];
if (pBlock == NULL) {
dError("QInfo:%p NULL Block In Cache, available block:%d, last block:%d, accessed null block:%d, pBlockId:%d",
GET_QINFO_ADDR(pQuery), pCacheInfo->numOfBlocks, pCacheInfo->currentSlot, slot, pQuery->blockId);
// the cache info snapshot must be existed.
int32_t curNumOfBlocks = pCacheInfo->numOfBlocks;
int32_t curSlot = pCacheInfo->currentSlot;
dError("QInfo:%p NULL Block In Cache, snapshot (available blocks:%d, last block:%d), current (available blocks:%d, "
"last block:%d), accessed null block:%d, pBlockId:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks,
pQuery->currentSlot, curNumOfBlocks, curSlot, slot, pQuery->blockId);
return NULL;
}
if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) {
dWarn(
"QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, "
"blockMeterObj:%p",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId,
pQuery->blockId, pMeterObj, pBlock->pMeterObj);
// block is empty or block does not belongs to current table, return NULL value
if (!isCacheBlockValid(pQuery, pBlock, pMeterObj)) {
return NULL;
}
return pBlock;
//the accessed cache block has been loaded already, return directly
if (vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD) {
TSKEY skey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, 0);
TSKEY ekey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pBlock->numOfPoints - 1);
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, cache block has been loaded, no need to load again, ts:%d, "
"slot:%d, brange:%lld-%lld, rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1,
pQuery->slot, skey, ekey, pBlock->numOfPoints);
return &pRuntimeEnv->cacheBlock;
}
// keep the structure as well as the block data into local buffer
memcpy(&pRuntimeEnv->cacheBlock, pBlock, sizeof(SCacheBlock));
// keep the data from in cache into the temporarily allocated buffer
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfoEx *pColumnInfoEx = &pQuery->colList[i];
int16_t columnIndex = pColumnInfoEx->colIdx;
int16_t columnIndexInBuf = pColumnInfoEx->colIdxInBuf;
SColumn* pCol = &pMeterObj->schema[columnIndex];
int16_t bytes = pCol->bytes;
int16_t type = pCol->type;
char* dst = pRuntimeEnv->colDataBuffer[columnIndexInBuf]->data;
if (pQuery->colList[i].colIdx != -1) {
assert(pCol->colId == pQuery->colList[i].data.colId && bytes == pColumnInfoEx->data.bytes &&
type == pColumnInfoEx->data.type);
memcpy(dst, pBlock->offset[columnIndex], pBlock->numOfPoints * bytes);
} else {
setNullN(dst, type, bytes, pBlock->numOfPoints);
}
}
// if the primary timestamp are not loaded by default, always load it here into buffer
if(!PRIMARY_TSCOL_LOADED(pQuery)) {
memcpy(pRuntimeEnv->primaryColBuffer->data, pBlock->offset[0], TSDB_KEYSIZE*pBlock->numOfPoints);
}
pQuery->fileId = -1;
pQuery->slot = slot;
if (!isCacheBlockValid(pQuery, pBlock, pMeterObj)) {
return NULL;
}
/*
* the accessed cache block still belongs to current meterObj, go on
* update the load data block info
*/
vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, -1, true);
TSKEY skey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, 0);
TSKEY ekey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pBlock->numOfPoints - 1);
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, load cache block, ts:%d, slot:%d, brange:%lld-%lld, rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1,
pQuery->slot, skey, ekey, pBlock->numOfPoints);
return &pRuntimeEnv->cacheBlock;
}
static SCompBlock *getDiskDataBlock(SQuery *pQuery, int32_t slot) {
......@@ -1110,11 +1198,13 @@ static SCompBlock *getDiskDataBlock(SQuery *pQuery, int32_t slot) {
return &pQuery->pBlock[slot];
}
static void *getGenericDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slot) {
static void *getGenericDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeEnv, int32_t slot) {
SQuery* pQuery = pRuntimeEnv->pQuery;
if (IS_DISK_DATA_BLOCK(pQuery)) {
return getDiskDataBlock(pQuery, slot);
} else {
return getCacheDataBlock(pMeterObj, pQuery, slot);
return getCacheDataBlock(pMeterObj, pRuntimeEnv, slot);
}
}
......@@ -1203,14 +1293,6 @@ static bool getQualifiedDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRunti
return true;
}
static char *doGetDataBlockImpl(const char *sdata, int32_t colIdx, bool isDiskFileBlock) {
if (isDiskFileBlock) {
return ((SData **)sdata)[colIdx]->data;
} else {
return ((SCacheBlock *)sdata)->offset[colIdx];
}
}
static SField *getFieldInfo(SQuery *pQuery, SBlockInfo *pBlockInfo, SField *pFields, int32_t column) {
// no SField info exist, or column index larger than the output column, no result.
if (pFields == NULL || column >= pQuery->numOfOutputCols) {
......@@ -1261,30 +1343,13 @@ static bool hasNullVal(SQuery *pQuery, int32_t col, SBlockInfo *pBlockInfo, SFie
return ret;
}
static char *doGetDataBlocks(bool isDiskFileBlock, SQueryRuntimeEnv *pRuntimeEnv, char *data, int32_t colIdx,
int32_t colId, int16_t type, int16_t bytes, int32_t tmpBufIndex) {
char *pData = NULL;
if (isDiskFileBlock) {
pData = doGetDataBlockImpl(data, colIdx, isDiskFileBlock);
} else {
SCacheBlock *pCacheBlock = (SCacheBlock *)data;
SMeterObj * pMeter = pRuntimeEnv->pMeterObj;
if (colIdx < 0 || pMeter->numOfColumns <= colIdx || pMeter->schema[colIdx].colId != colId) {
// data in cache is not current available, we need fill the data block in null value
pData = pRuntimeEnv->colDataBuffer[tmpBufIndex]->data;
setNullN(pData, type, bytes, pCacheBlock->numOfPoints);
} else {
pData = doGetDataBlockImpl(data, colIdx, isDiskFileBlock);
}
}
static char *doGetDataBlocks(SQuery* pQuery, SData** data, int32_t colIdx) {
assert(colIdx >= 0 && colIdx < pQuery->numOfCols);
char* pData = data[colIdx]->data;
return pData;
}
static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, char *data, SArithmeticSupport *sas, int32_t col,
int32_t size, bool isDiskFileBlock) {
static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
......@@ -1303,21 +1368,17 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, char *data, SArithmeti
}
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
int32_t colIdx = isDiskFileBlock ? pQuery->colList[i].colIdxInBuf : pQuery->colList[i].colIdx;
SColumnInfo *pColMsg = &pQuery->colList[i].data;
char * pData = doGetDataBlocks(isDiskFileBlock, pRuntimeEnv, data, colIdx, pColMsg->colId, pColMsg->type,
pColMsg->bytes, pQuery->colList[i].colIdxInBuf);
char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf);
sas->elemSize[i] = pColMsg->bytes;
sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset
}
sas->numOfCols = pQuery->numOfCols;
sas->offset = 0;
} else { // other type of query function
SColIndexEx *pCol = &pQuery->pSelectExpr[col].pBase.colInfo;
int32_t colIdx = isDiskFileBlock ? pCol->colIdxInBuf : pCol->colIdx;
if (TSDB_COL_IS_TAG(pCol->flag)) {
dataBlock = NULL;
} else {
......@@ -1326,8 +1387,7 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, char *data, SArithmeti
* the remain meter may not have the required column in cache actually.
* So, the validation of required column in cache with the corresponding meter schema is reinforced.
*/
dataBlock = doGetDataBlocks(isDiskFileBlock, pRuntimeEnv, data, colIdx, pCol->colId, pCtx[col].inputType,
pCtx[col].inputBytes, pCol->colIdxInBuf);
dataBlock = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pCol->colIdxInBuf);
}
}
......@@ -1339,17 +1399,17 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, char *data, SArithmeti
* @param pRuntimeEnv
* @param forwardStep
* @param primaryKeyCol
* @param data
* @param pFields
* @param isDiskFileBlock
* @return the incremental number of output value, so it maybe 0 for fixed number of query,
* such as count/min/max etc.
*/
static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep, TSKEY *primaryKeyCol,
char *data, SField *pFields, SBlockInfo *pBlockInfo, bool isDiskFileBlock) {
SField *pFields, SBlockInfo *pBlockInfo) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus);
int64_t prevNumOfRes = getNumOfResult(pRuntimeEnv);
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport));
......@@ -1360,8 +1420,8 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
SField dummyField = {0};
bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, data, &sasArray[k], k, forwardStep, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep);
SField *tpField = NULL;
if (pFields != NULL) {
......@@ -1496,59 +1556,32 @@ static int32_t setGroupResultForKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData,
return TSDB_CODE_SUCCESS;
}
static char *getGroupbyColumnData(SQueryRuntimeEnv *pRuntimeEnv, SField *pFields, SBlockInfo *pBlockInfo, char *data,
bool isDiskFileBlock, int16_t *type, int16_t *bytes) {
SQuery *pQuery = pRuntimeEnv->pQuery;
static char *getGroupbyColumnData(SQuery* pQuery, SData** data, int16_t* type, int16_t* bytes) {
char * groupbyColumnData = NULL;
int32_t col = 0;
int16_t colIndexInBuf = 0;
SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr;
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
if (pGroupbyExpr->columnInfo[k].flag == TSDB_COL_TAG) {
continue;
}
int16_t colIndex = -1;
int32_t colId = pGroupbyExpr->columnInfo[k].colId;
if (isDiskFileBlock) { // get the required column data in file block according the column ID
for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) {
if (colId == pFields[i].colId) {
*type = pFields[i].type;
*bytes = pFields[i].bytes;
col = i;
break;
}
}
// this column may not in current data block and also not in the required columns list
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (colId == pQuery->colList[i].data.colId) {
colIndexInBuf = i;
break;
}
}
} else { // get the required data column in cache
SColumn *pSchema = pRuntimeEnv->pMeterObj->schema;
for (int32_t i = 0; i < pRuntimeEnv->pMeterObj->numOfColumns; ++i) {
if (colId == pSchema[i].colId) {
*type = pSchema[i].type;
*bytes = pSchema[i].bytes;
col = i;
colIndexInBuf = i;
break;
}
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].data.colId == colId) {
colIndex = i;
break;
}
}
int32_t columnIndex = isDiskFileBlock ? colIndexInBuf : col;
groupbyColumnData =
doGetDataBlocks(isDiskFileBlock, pRuntimeEnv, data, columnIndex, colId, *type, *bytes, colIndexInBuf);
assert(colIndex >= 0 && colIndex < pQuery->numOfCols);
*type = pQuery->colList[colIndex].data.type;
*bytes = pQuery->colList[colIndex].data.bytes;
groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].colIdxInBuf);
break;
}
......@@ -1608,9 +1641,12 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
}
static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *forwardStep, TSKEY *primaryKeyCol,
char *data, SField *pFields, SBlockInfo *pBlockInfo, bool isDiskFileBlock) {
SField *pFields, SBlockInfo *pBlockInfo) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus);
SData **data = pRuntimeEnv->colDataBuffer;
int64_t prevNumOfRes = 0;
bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
......@@ -1626,15 +1662,15 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
char *groupbyColumnData = NULL;
if (groupbyStateValue) {
groupbyColumnData = getGroupbyColumnData(pRuntimeEnv, pFields, pBlockInfo, data, isDiskFileBlock, &type, &bytes);
groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes);
}
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, data, &sasArray[k], k, *forwardStep, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep);
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey;
int64_t alignedTimestamp =
taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision);
......@@ -1646,15 +1682,11 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
// set the input column data
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
int32_t colIdx = isDiskFileBlock ? pFilterInfo->info.colIdxInBuf : pFilterInfo->info.colIdx;
SColumnInfo * pColumnInfo = &pFilterInfo->info.data;
/*
* NOTE: here the tbname/tags column cannot reach here, since it will never be a filter column,
* so we do NOT check if is a tag or not
*/
pFilterInfo->pData = doGetDataBlocks(isDiskFileBlock, pRuntimeEnv, data, colIdx, pColumnInfo->colId,
pColumnInfo->type, pColumnInfo->bytes, pFilterInfo->info.colIdxInBuf);
pFilterInfo->pData = doGetDataBlocks(pQuery, data, pFilterInfo->info.colIdxInBuf);
}
int32_t numOfRes = 0;
......@@ -1803,7 +1835,7 @@ static void validateQueryRangeAndData(SQueryRuntimeEnv *pRuntimeEnv, const TSKEY
}
static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, int64_t *pPrimaryColumn,
char *sdata, SField *pFields, __block_search_fn_t searchFn, int32_t *numOfRes) {
SField *pFields, __block_search_fn_t searchFn, int32_t *numOfRes) {
int32_t forwardStep = 0;
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -1856,14 +1888,11 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *
pQuery->lastKey = pPrimaryColumn[pQuery->pos + (newForwardStep - 1) * step] + step;
}
bool isFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus);
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
*numOfRes =
rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, sdata, pFields, pBlockInfo, isFileBlock);
rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, pFields, pBlockInfo);
} else {
*numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pPrimaryColumn, sdata, pFields, pBlockInfo,
isFileBlock);
*numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pPrimaryColumn, pFields, pBlockInfo);
}
assert(*numOfRes >= 0);
......@@ -1905,7 +1934,7 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
if (order == TSQL_SO_ASC) {
int32_t i = 0;
int32_t step = 1;
int32_t step = QUERY_ASC_FORWARD_STEP;
while (i<numOfFiles && * fid> pVnodeFiles->pFileInfo[i].fileID) {
i += step;
......@@ -1919,7 +1948,7 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
}
} else {
int32_t i = numOfFiles - 1;
int32_t step = -1;
int32_t step = QUERY_DESC_FORWARD_STEP;
while (i >= 0 && *fid < pVnodeFiles->pFileInfo[i].fileID) {
i += step;
......@@ -1938,13 +1967,13 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->fileId += step;
int32_t fid = 0;
int32_t fileIndex = 0;
int32_t order = (step == QUERY_ASC_FORWARD_STEP) ? TSQL_SO_ASC : TSQL_SO_DESC;
while (1) {
fid = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, order);
fileIndex = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, order);
// no files left, abort
if (fid < 0) {
if (fileIndex < 0) {
if (step == QUERY_ASC_FORWARD_STEP) {
dTrace("QInfo:%p no file to access, try data in cache", GET_QINFO_ADDR(pQuery));
} else {
......@@ -1956,9 +1985,8 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter
break;
}
// failed to mmap header file into memory will cause the retrieval of compblock info failed
if (vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fid) > 0) {
if (vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fileIndex) > 0) {
break;
}
......@@ -1971,15 +1999,15 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter
pQuery->fileId += step;
/* for backwards search, if the first file is not valid, abort */
if (step < 0 && fid == 0) {
if (step < 0 && fileIndex == 0) {
vnodeFreeFieldsEx(pRuntimeEnv);
pQuery->fileId = -1;
fid = -1;
fileIndex = -1;
break;
}
}
return fid;
return fileIndex;
}
void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimestamp, void *inputData,
......@@ -2165,60 +2193,12 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery
// for loading block data in memory
assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock);
// To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system.
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
int32_t bytes = pQuery->colList[i].data.bytes;
pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes);
if (pRuntimeEnv->colDataBuffer[i] == NULL) {
goto _error_clean;
}
}
// record the maximum column width among columns of this meter/metric
int32_t maxColWidth = pQuery->colList[0].data.bytes;
for (int32_t i = 1; i < pQuery->numOfCols; ++i) {
int32_t bytes = pQuery->colList[i].data.bytes;
if (bytes > maxColWidth) {
maxColWidth = bytes;
}
}
pRuntimeEnv->primaryColBuffer = NULL;
if (PRIMARY_TSCOL_LOADED(pQuery)) {
pRuntimeEnv->primaryColBuffer = pRuntimeEnv->colDataBuffer[0];
} else {
pRuntimeEnv->primaryColBuffer =
(SData *)malloc(pMeterObj->pointsPerFileBlock * TSDB_KEYSIZE + sizeof(SData) + EXTRA_BYTES);
}
pRuntimeEnv->unzipBufSize = (size_t)(maxColWidth * pMeterObj->pointsPerFileBlock + EXTRA_BYTES); // plus extra_bytes
pRuntimeEnv->unzipBuffer = (char *)malloc(pRuntimeEnv->unzipBufSize);
pRuntimeEnv->secondaryUnzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize);
if (pRuntimeEnv->unzipBuffer == NULL || pRuntimeEnv->secondaryUnzipBuffer == NULL ||
pRuntimeEnv->primaryColBuffer == NULL) {
goto _error_clean;
}
return TSDB_CODE_SUCCESS;
_error_clean:
tfree(pRuntimeEnv->resultInfo);
tfree(pRuntimeEnv->pCtx);
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
tfree(pRuntimeEnv->colDataBuffer[i]);
}
tfree(pRuntimeEnv->unzipBuffer);
tfree(pRuntimeEnv->secondaryUnzipBuffer);
if (!PRIMARY_TSCOL_LOADED(pQuery)) {
tfree(pRuntimeEnv->primaryColBuffer);
}
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -2373,6 +2353,11 @@ bool isFirstLastRowQuery(SQuery *pQuery) {
return false;
}
bool notHasQueryTimeRange(SQuery *pQuery) {
return (pQuery->skey == 0 && pQuery->ekey == INT64_MAX && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->skey == INT64_MAX && pQuery->ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery)));
}
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; }
bool needSupplementaryScan(SQuery *pQuery) {
......@@ -2447,11 +2432,13 @@ static int32_t getFirstCacheSlot(int32_t numOfBlocks, int32_t lastSlot, SCacheIn
return (lastSlot - numOfBlocks + 1 + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks;
}
static bool cacheBoundaryCheck(SQuery *pQuery, SMeterObj *pMeterObj) {
static bool cacheBoundaryCheck(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj *pMeterObj) {
/*
* here we get the first slot from the meter cache, not from the cache snapshot from pQuery, since the
* snapshot value in pQuery may have been expired now.
*/
SQuery* pQuery = pRuntimeEnv->pQuery;
SCacheInfo * pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
SCacheBlock *pBlock = NULL;
......@@ -2475,8 +2462,8 @@ static bool cacheBoundaryCheck(SQuery *pQuery, SMeterObj *pMeterObj) {
* pBlock may be null value since this block is flushed to disk, and re-distributes to
* other meter, so go on until we get the first not flushed cache block.
*/
if ((pBlock = getCacheDataBlock(pMeterObj, pQuery, first)) != NULL) {
keyFirst = getTimestampInCacheBlock(pBlock, 0);
if ((pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, first)) != NULL) {
keyFirst = getTimestampInCacheBlock(pRuntimeEnv, pBlock, 0);
break;
} else {
/*
......@@ -2519,7 +2506,6 @@ void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t v
pQuery->currentSlot = lastSlot;
pQuery->numOfBlocks = numOfBlocks;
pQuery->firstSlot = getFirstCacheSlot(numOfBlocks, lastSlot, pCacheInfo);
;
/*
* Note: the block id is continuous increasing, never becomes smaller.
......@@ -2553,7 +2539,7 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo
assert((pQuery->lastKey >= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->lastKey <= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery)));
if (!ignoreQueryRange && !cacheBoundaryCheck(pQuery, pMeterObj)) {
if (!ignoreQueryRange && !cacheBoundaryCheck(pRuntimeEnv, pMeterObj)) {
return -1;
}
......@@ -2568,8 +2554,13 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo
/* locate the first point of which time stamp is no less than pQuery->skey */
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
SCacheBlock *pBlock = pCacheInfo->cacheBlocks[*slot];
(*pos) = searchFn(pBlock->offset[0], pBlock->numOfPoints, pQuery->skey, pQuery->order.order);
pQuery->slot = *slot;
SCacheBlock* pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
if (pBlock == NULL) {
return -1;
}
(*pos) = searchFn(pRuntimeEnv->primaryColBuffer->data, pBlock->numOfPoints, pQuery->skey, pQuery->order.order);
// restore skey before return
pQuery->skey = rawskey;
......@@ -2579,7 +2570,7 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo
return -1;
}
int64_t nextKey = getTimestampInCacheBlock(pBlock, *pos);
int64_t nextKey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, *pos);
if ((nextKey < pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextKey > pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
// all data are less than the pQuery->lastKey(pQuery->sKey) for asc query
......@@ -2631,7 +2622,7 @@ bool hasDataInCache(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj) {
return false;
}
return cacheBoundaryCheck(pQuery, pMeterObj);
return cacheBoundaryCheck(pRuntimeEnv, pMeterObj);
}
/**
......@@ -2699,7 +2690,7 @@ static void doGetAlignedIntervalQueryRange(SQuery *pQuery, TSKEY key, TSKEY skey
pQuery->lastKey = pQuery->skey;
}
static void getOneRowFromDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, int32_t pos) {
static void getOneRowFromDataBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, int32_t pos) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
......@@ -2708,32 +2699,6 @@ static void getOneRowFromDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, in
}
}
static void getOneRowFromCacheBlock(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, SCacheBlock *pBlock,
char **dst, int32_t pos) {
SQuery *pQuery = pRuntimeEnv->pQuery;
/*
* in case of cache block expired, the pos may exceed the number of points in block, so check
* the range in the first place.
*/
if (pos > pBlock->numOfPoints) {
pos = pBlock->numOfPoints;
}
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
int32_t colIdx = pQuery->colList[i].colIdx;
int32_t colId = pQuery->colList[i].data.colId;
SColumn *pCols = &pMeterObj->schema[colIdx];
if (colIdx < 0 || colIdx >= pMeterObj->numOfColumns || pCols->colId != colId) { // set null
setNull(dst[i], pCols->type, pCols->bytes);
} else {
memcpy(dst[i], pBlock->offset[colIdx] + pos * pCols->bytes, pCols->bytes);
}
}
}
static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMeterObj,
SPointInterpoSupporter *pPointInterpSupporter) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
......@@ -2759,29 +2724,7 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet
pQuery->slot, pQuery->pos);
// save the point that is directly after or equals to the specified point
if (IS_DISK_DATA_BLOCK(pQuery)) {
getOneRowFromDiskBlock(pRuntimeEnv, pPointInterpSupporter->pNextPoint, pQuery->pos);
} else {
pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
while (pBlock == NULL) {
// cache block is flushed to disk, try to find new query position again
getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn);
// new position is located in file, load data and abort
if (IS_DISK_DATA_BLOCK(pQuery)) {
getOneRowFromDiskBlock(pRuntimeEnv, pPointInterpSupporter->pNextPoint, pQuery->pos);
break;
} else {
pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
}
}
if (!IS_DISK_DATA_BLOCK(pQuery)) {
getOneRowFromCacheBlock(pRuntimeEnv, pMeterObj, pBlock, pPointInterpSupporter->pNextPoint, pQuery->pos);
}
}
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pNextPoint, pQuery->pos);
/*
* 1. for last_row query, return immediately.
......@@ -2810,12 +2753,8 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet
if (pQuery->pos > 0) {
int32_t prevPos = pQuery->pos - 1;
if (IS_DISK_DATA_BLOCK(pQuery)) {
/* save the point that is directly after the specified point */
getOneRowFromDiskBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, prevPos);
} else {
getOneRowFromCacheBlock(pRuntimeEnv, pMeterObj, pBlock, pPointInterpSupporter->pPrevPoint, prevPos);
}
/* save the point that is directly after the specified point */
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, prevPos);
} else {
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
......@@ -2825,7 +2764,8 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet
moveToNextBlock(pRuntimeEnv, QUERY_DESC_FORWARD_STEP, searchFn, true);
/*
* no previous data exists reset the status and load the data block that contains the qualified point
* no previous data exists.
* reset the status and load the data block that contains the qualified point
*/
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%lld-%lld, out of range",
......@@ -2838,21 +2778,20 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet
} else { // prev has been located
if (pQuery->fileId >= 0) {
pQuery->pos = pQuery->pBlock[pQuery->slot].numOfPoints - 1;
getOneRowFromDiskBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
pQuery->fileId, pQuery->slot, pQuery->pos, pQuery->pos);
} else {
pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
if (pBlock == NULL) {
// todo nothing, the previous block is flushed to disk
} else {
pQuery->pos = pBlock->numOfPoints - 1;
getOneRowFromCacheBlock(pRuntimeEnv, pMeterObj, pBlock, pPointInterpSupporter->pPrevPoint, pQuery->pos);
// moveToNextBlock make sure there is a available cache block, if exists
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
pBlock = &pRuntimeEnv->cacheBlock;
pQuery->pos = pBlock->numOfPoints - 1;
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos);
}
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos);
}
}
}
......@@ -2886,6 +2825,86 @@ static bool doGetQueryPos(TSKEY key, SMeterQuerySupportObj *pSupporter, SPointIn
}
}
static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter,
SPointInterpoSupporter *pPointInterpSupporter, SMeterObj *pMeterObj,TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
if (isFirstLastRowQuery(pQuery)) {
/*
* if the pQuery->skey != pQuery->ekey for last_row query,
* the query range is existed, so set them both the value of nextKey
*/
if (pQuery->skey != pQuery->ekey) {
assert(pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery) && nextKey >= pQuery->ekey &&
nextKey <= pQuery->skey);
pQuery->skey = nextKey;
pQuery->ekey = nextKey;
}
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
} else {
getAlignedIntervalQueryRange(pQuery, nextKey, pQuery->skey, pQuery->ekey);
return true;
}
}
//TODO refactor code, the best way to implement the last_row is utilizing the iterator
bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInterpoSupporter *pPointInterpSupporter) {
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
assert(!QUERY_IS_ASC_QUERY(pQuery) && notHasQueryTimeRange(pQuery));
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
TSKEY lastKey = -1;
// todo copy data into temp buffer to avoid the buffer expired
pQuery->fileId = -1;
vnodeFreeFieldsEx(pRuntimeEnv);
// keep in-memory cache status in local variables in case that it may be changed by write operation
getBasicCacheInfoSnapshot(pQuery, pMeterObj->pCache, pMeterObj->vnode);
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
if (pCacheInfo != NULL && pCacheInfo->cacheBlocks != NULL && pQuery->numOfBlocks > 0) {
pQuery->fileId = -1;
TSKEY key = pMeterObj->lastKey;
pQuery->skey = key;
pQuery->ekey = key;
pQuery->lastKey = pQuery->skey;
// todo cache block may have been flushed to disk, and no data in cache anymore.
// So, copy cache block to local buffer is required.
lastKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
} else { // no data in cache, try file
TSKEY key = pMeterObj->lastKeyOnFile;
pQuery->skey = key;
pQuery->ekey = key;
pQuery->lastKey = pQuery->skey;
bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn);
if (!ret) { // no data in file, return false;
return false;
}
lastKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
}
assert(lastKey <= pQuery->skey);
pQuery->skey = lastKey;
pQuery->ekey = lastKey;
pQuery->lastKey = pQuery->skey;
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
}
/**
* determine the first query range, according to raw query range [skey, ekey] and group-by interval.
* the time interval for aggregating is not enforced to check its validation, the minimum interval is not less than
......@@ -2903,7 +2922,7 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup
if (dataInDisk && getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) {
TSKEY key = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
assert(key >= pQuery->skey);
return doGetQueryPos(key, pSupporter, pPointInterpSupporter);
}
......@@ -2918,30 +2937,26 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup
TSKEY nextKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter);
} else { // descending order
} else { // descending order
if (dataInCache) { // todo handle error
TSKEY nextKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
assert(nextKey == -1 || nextKey <= pQuery->skey);
// valid data in cache
if (nextKey != -1) {
if (nextKey != -1) { // find qualified data in cache
if (nextKey >= pQuery->ekey) {
if (isFirstLastRowQuery(pQuery)) {
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
} else {
getAlignedIntervalQueryRange(pQuery, nextKey, pQuery->skey, pQuery->ekey);
return true;
}
return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, nextKey);
} else {
/*
* nextKey < pQuery->ekey && nextKey < pQuery->lastKey, query range is
* larger than all data, abort NOTE: Interp query does not reach here, since for all interp query,
* larger than all data, abort
*
* NOTE: Interp query does not reach here, since for all interp query,
* the query order is ascending order.
*/
return false;
}
} else { // all data in cache are greater than pQuery->lastKey, try file
} else { // all data in cache are greater than pQuery->skey, try file
}
}
......@@ -2949,19 +2964,11 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup
TSKEY key = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
assert(key <= pQuery->skey);
/* key in query range. If not, no qualified in disk file */
// key in query range. If not, no qualified in disk file
if (key >= pQuery->ekey) {
if (isFirstLastRowQuery(pQuery)) { /* no qualified data in this query range */
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
} else {
getAlignedIntervalQueryRange(pQuery, key, pQuery->skey, pQuery->ekey);
return true;
}
} else { // Goes on in case of key in file less than pMeterObj->lastKey,
// which is also the pQuery->skey
if (isFirstLastRowQuery(pQuery)) {
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
}
return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, key);
} else { //In case of all queries, the value of false will be returned if key < pQuery->ekey
return false;
}
}
}
......@@ -2987,9 +2994,9 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p
return -1;
}
SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
if (pBlock != NULL) {
nextTimestamp = getTimestampInCacheBlock(pBlock, position->pos);
nextTimestamp = getTimestampInCacheBlock(pRuntimeEnv, pBlock, position->pos);
} else {
// todo fix it
}
......@@ -3172,12 +3179,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInf
}
// update the pQuery->limit.offset value, and pQuery->pos value
TSKEY *keys = NULL;
if (IS_DISK_DATA_BLOCK(pQuery)) {
keys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
} else {
keys = (TSKEY *)(((SCacheBlock *)pBlock)->offset[0]);
}
TSKEY *keys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
int32_t i = 0;
if (QUERY_IS_ASC_QUERY(pQuery)) {
......@@ -3218,7 +3220,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInf
if (IS_DISK_DATA_BLOCK(pQuery)) {
pQuery->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
} else {
pQuery->skey = getTimestampInCacheBlock(pBlock, pQuery->pos);
pQuery->skey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pQuery->pos);
}
// update the offset value
......@@ -3252,16 +3254,22 @@ static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB
static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
// in case of point-interpolation query, use asc order scan
char msg[] =
"QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, "
const char* msg = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, "
"new qrange:%lld-%lld";
// descending order query
// descending order query for last_row query
if (isFirstLastRowQuery(pQuery)) {
dTrace("QInfo:%p scan order changed for last_row query, old:%d, new:%d", GET_QINFO_ADDR(pQuery),
pQuery->order.order, TSQL_SO_DESC);
pQuery->order.order = TSQL_SO_DESC;
int64_t skey = MIN(pQuery->skey, pQuery->ekey);
int64_t ekey = MAX(pQuery->skey, pQuery->ekey);
pQuery->skey = ekey;
pQuery->ekey = skey;
return;
}
......@@ -3334,11 +3342,11 @@ static int32_t doSkipDataBlock(SQueryRuntimeEnv *pRuntimeEnv) {
break;
}
void *pBlock = getGenericDataBlock(pMeterObj, pQuery, pQuery->slot);
void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
assert(pBlock != NULL);
int32_t blockType = IS_DISK_DATA_BLOCK(pQuery) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
SBlockInfo blockInfo = getBlockBasicInfo(pBlock, blockType);
SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, blockType);
int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1;
assert(maxReads >= 0);
......@@ -3367,10 +3375,10 @@ void forwardQueryStartPosition(SQueryRuntimeEnv *pRuntimeEnv) {
return;
}
void *pBlock = getGenericDataBlock(pMeterObj, pQuery, pQuery->slot);
void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
SBlockInfo blockInfo = getBlockBasicInfo(pBlock, blockType);
SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, blockType);
// get the qualified data that can be skipped
int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1;
......@@ -3560,7 +3568,6 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
pCtx->numOfParams = 4;
SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf;
pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail));
SInterpInfoDetail *pInterpDetail = pInterpInfo->pInterpDetail;
......@@ -3712,10 +3719,70 @@ static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQue
return TSDB_CODE_SUCCESS;
}
static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* pMeterObj) {
SQuery* pQuery = pRuntimeEnv->pQuery;
// To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system.
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
int32_t bytes = pQuery->colList[i].data.bytes;
pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes);
if (pRuntimeEnv->colDataBuffer[i] == NULL) {
goto _error_clean;
}
}
// record the maximum column width among columns of this meter/metric
int32_t maxColWidth = pQuery->colList[0].data.bytes;
for (int32_t i = 1; i < pQuery->numOfCols; ++i) {
int32_t bytes = pQuery->colList[i].data.bytes;
if (bytes > maxColWidth) {
maxColWidth = bytes;
}
}
pRuntimeEnv->primaryColBuffer = NULL;
if (PRIMARY_TSCOL_LOADED(pQuery)) {
pRuntimeEnv->primaryColBuffer = pRuntimeEnv->colDataBuffer[0];
} else {
pRuntimeEnv->primaryColBuffer =
(SData *) malloc(pMeterObj->pointsPerFileBlock * TSDB_KEYSIZE + sizeof(SData) + EXTRA_BYTES);
}
pRuntimeEnv->unzipBufSize = (size_t)(maxColWidth * pMeterObj->pointsPerFileBlock + EXTRA_BYTES); // plus extra_bytes
pRuntimeEnv->unzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize);
pRuntimeEnv->secondaryUnzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize);
if (pRuntimeEnv->unzipBuffer == NULL || pRuntimeEnv->secondaryUnzipBuffer == NULL ||
pRuntimeEnv->primaryColBuffer == NULL) {
goto _error_clean;
}
return TSDB_CODE_SUCCESS;
_error_clean:
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
tfree(pRuntimeEnv->colDataBuffer[i]);
}
tfree(pRuntimeEnv->unzipBuffer);
tfree(pRuntimeEnv->secondaryUnzipBuffer);
if (!PRIMARY_TSCOL_LOADED(pQuery)) {
tfree(pRuntimeEnv->primaryColBuffer);
}
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMeterQuerySupportObj *pSupporter,
void *param) {
SQuery *pQuery = &pQInfo->query;
int32_t code = TSDB_CODE_SUCCESS;
/*
* only the successful complete requries the sem_post/over = 1 operations.
*/
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->skey > pQuery->ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->ekey > pQuery->skey))) {
dTrace("QInfo:%p no result in time range %lld-%lld, order %d", pQInfo, pQuery->skey, pQuery->ekey,
......@@ -3723,7 +3790,6 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
return TSDB_CODE_SUCCESS;
}
......@@ -3748,6 +3814,11 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv;
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pMeterObj = pMeterObj;
if ((code = allocateRuntimeEnvBuf(pRuntimeEnv, pMeterObj)) != TSDB_CODE_SUCCESS) {
return code;
}
vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, &dataInDisk, &dataInCache);
......@@ -3756,8 +3827,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
dTrace("QInfo:%p no result in query", pQInfo);
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
return TSDB_CODE_SUCCESS;
return code;
}
pRuntimeEnv->pTSBuf = param;
......@@ -3768,16 +3838,16 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
}
// create runtime environment
int32_t ret = setupQueryRuntimeEnv(pMeterObj, pQuery, &pSupporter->runtimeEnv, NULL, pQuery->order.order, false);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
code = setupQueryRuntimeEnv(pMeterObj, pQuery, &pSupporter->runtimeEnv, NULL, pQuery->order.order, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
vnodeRecordAllFiles(pQInfo, pMeterObj->vnode);
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if ((ret = allocateOutputBufForGroup(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) {
return ret;
if ((code = allocateOutputBufForGroup(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) {
return code;
}
int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
......@@ -3788,34 +3858,42 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
pRuntimeEnv->usedIndex = 0;
pRuntimeEnv->pResult = pSupporter->pResult;
}
// in case of last_row query, we set the query timestamp to pMeterObj->lastKey;
if (isFirstLastRowQuery(pQuery)) {
pQuery->skey = pMeterObj->lastKey;
pQuery->ekey = pMeterObj->lastKey;
pQuery->lastKey = pQuery->skey;
}
pSupporter->rawSKey = pQuery->skey;
pSupporter->rawEKey = pQuery->ekey;
/* query on single table */
pSupporter->numOfMeters = 1;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SPointInterpoSupporter interpInfo = {0};
pointInterpSupporterInit(pQuery, &interpInfo);
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
(isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
/*
* in case of last_row query without query range, we set the query timestamp to
* pMeterObj->lastKey. Otherwise, keep the initial query time range unchanged.
*/
if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) {
if (!normalizeUnBoundLastRowQuery(pSupporter, &interpInfo)) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
}
} else {
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
(isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
}
}
/*
* here we set the value for before and after the specified time into the
* parameter for interpolation query
......@@ -3949,12 +4027,17 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSQL_SO_ASC : TSQL_SO_DESC;
tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
}
int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pSupporter->runtimeEnv, pTagSchema, TSQL_SO_ASC, true);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
ret = allocateRuntimeEnvBuf(pRuntimeEnv, pMeter);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
tSidSetSort(pSupporter->pSidSet);
vnodeRecordAllFiles(pQInfo, pMeter->vnode);
......@@ -4057,13 +4140,12 @@ void UNUSED_FUNC truncateResultByLimit(SQInfo *pQInfo, int64_t *final, int32_t *
}
}
TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index) {
TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv* pRuntimeEnv, SCacheBlock *pBlock, int32_t index) {
if (pBlock == NULL || index >= pBlock->numOfPoints || index < 0) {
return -1;
}
TSKEY *ts = (TSKEY *)pBlock->offset[0];
return ts[index];
return ((TSKEY*)(pRuntimeEnv->primaryColBuffer->data))[index];
}
/*
......@@ -4080,20 +4162,21 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot);
// this block must be loaded into buffer
SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
assert(pQuery->pos >= 0 && pQuery->pos < pBlock->numOfPoints);
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
bool loadTimestamp = true;
int32_t fileId = pQuery->fileId;
int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&fileId, pRuntimeEnv, pQuery->order.order);
int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order);
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot);
int32_t ret =
loadDataBlockIntoMem(pBlock, &pQuery->pFields[pQuery->slot], pRuntimeEnv, fileIndex, loadTimestamp, true);
bool loadTS = true;
bool loadFields = true;
int32_t slot = pQuery->slot;
int32_t ret = loadDataBlockIntoMem(pBlock, &pQuery->pFields[slot], pRuntimeEnv, fileIndex, loadTS, loadFields);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
......@@ -4135,7 +4218,7 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
dTrace(
"QInfo:%p vid:%d sid:%d id:%s cache block re-allocated to other meter, "
"QInfo:%p vid:%d sid:%d id:%s cache block is assigned to other meter, "
"try get query start position in file/cache, qrange:%lld-%lld, lastKey:%lld",
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery->lastKey);
......@@ -4146,7 +4229,7 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear
*/
bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn);
dTrace("QInfo:%p vid:%d sid:%d id:%s find the possible position, fileId:%d, slot:%d, pos:%d", pQInfo,
dTrace("QInfo:%p vid:%d sid:%d id:%s find the possible position in file, fileId:%d, slot:%d, pos:%d", pQInfo,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot, pQuery->pos);
if (ret) {
......@@ -4187,8 +4270,8 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear
static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
assert(pQuery->fileId < 0);
/*
......@@ -4208,6 +4291,7 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste
int32_t currentSlot = pCacheInfo->currentSlot;
int32_t firstSlot = getFirstCacheSlot(numOfBlocks, currentSlot, pCacheInfo);
if (step == QUERY_DESC_FORWARD_STEP && pQuery->slot == firstSlot) {
bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn);
if (ret) {
......@@ -4220,7 +4304,6 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste
// the skip operation does NOT set the startPos yet
// assert(pRuntimeEnv->startPos.fileId < 0);
} else {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
}
......@@ -4229,7 +4312,7 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste
/* now still iterate the cache data blocks */
pQuery->slot = (pQuery->slot + step + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks;
SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
/*
* data in this cache block has been flushed to disk, then we should locate the start position in file.
......@@ -4242,7 +4325,7 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste
} else {
pQuery->pos = (QUERY_IS_ASC_QUERY(pQuery)) ? 0 : pBlock->numOfPoints - 1;
TSKEY startkey = getTimestampInCacheBlock(pBlock, pQuery->pos);
TSKEY startkey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pQuery->pos);
if (startkey < 0) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
......@@ -4277,22 +4360,25 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl
(step == QUERY_DESC_FORWARD_STEP && (pQuery->slot == 0))) {
fileIndex = getNextDataFileCompInfo(pRuntimeEnv, pMeterObj, step);
/* data maybe in cache */
if (fileIndex < 0) {
if (fileIndex >= 0) { // next file
pQuery->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->numOfBlocks - 1;
pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1;
} else { // try data in cache
assert(pQuery->fileId == -1);
if (step == QUERY_ASC_FORWARD_STEP) {
getFirstDataBlockInCache(pRuntimeEnv);
} else { /* no data any more */
} else { // no data to check for desc order query
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
}
return DISK_DATA_LOADED;
} else {
pQuery->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->numOfBlocks - 1;
pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1;
}
} else { // next block in the same file
int32_t fid = pQuery->fileId;
fileIndex = vnodeGetVnodeHeaderFileIdx(&fid, pRuntimeEnv, pQuery->order.order);
pQuery->slot += step;
pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1;
}
......@@ -4304,14 +4390,11 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl
return DISK_DATA_LOADED;
}
// load data block function will change the value of pQuery->pos
int32_t ret =
LoadDatablockOnDemand(&pQuery->pBlock[pQuery->slot], &pQuery->pFields[pQuery->slot], &pRuntimeEnv->blockStatus,
pRuntimeEnv, fileIndex, pQuery->slot, searchFn, true);
if (ret != DISK_DATA_LOADED) {
/*
* if it is the last block of file, set current access position at the last point of the meter in this file,
* in order to get the correct next access point,
*/
return ret;
}
} else { // data in cache
......@@ -4321,62 +4404,33 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl
return DISK_DATA_LOADED;
}
static void doHandleFileBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pblockInfo, __block_search_fn_t searchFn,
SData **sdata, int32_t *numOfRes, int32_t blockLoadStatus, int32_t *forwardStep) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
int64_t start = taosGetTimestampUs();
SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot);
*pblockInfo = getBlockBasicInfo(pBlock, BLK_FILE_BLOCK);
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
if (blockLoadStatus == DISK_DATA_LOADED) {
*forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, (char *)sdata,
pQuery->pFields[pQuery->slot], searchFn, numOfRes);
} else {
*forwardStep = pblockInfo->size;
}
pSummary->fileTimeUs += (taosGetTimestampUs() - start);
}
static void doHandleCacheBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pblockInfo, __block_search_fn_t searchFn,
int32_t *numOfRes, int32_t *forwardStep) {
static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pblockInfo, __block_search_fn_t searchFn,
int32_t *numOfRes, int32_t blockLoadStatus, int32_t *forwardStep) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
int64_t start = taosGetTimestampUs();
// todo refactor getCacheDataBlock.
//#ifdef _CACHE_INVALID_TEST
// taosMsleep(20000);
//#endif
SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
while (pBlock == NULL) {
getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn);
if (IS_DISK_DATA_BLOCK(pQuery)) { // do check data block in file
break;
if (IS_DISK_DATA_BLOCK(pQuery)) {
SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot);
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK);
if (blockLoadStatus == DISK_DATA_LOADED) {
*forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot], searchFn,
numOfRes);
} else {
pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
*forwardStep = pblockInfo->size;
}
}
if (IS_DISK_DATA_BLOCK(pQuery)) {
// start query position is located in file, try query on file block
doHandleFileBlockImpl(pRuntimeEnv, pblockInfo, searchFn, pRuntimeEnv->colDataBuffer, numOfRes, DISK_DATA_LOADED,
forwardStep);
} else { // also query in cache block
*pblockInfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK);
TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0];
pSummary->fileTimeUs += (taosGetTimestampUs() - start);
} else {
SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot);
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK);
*forwardStep =
applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, (char *)pBlock, NULL, searchFn, numOfRes);
applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes);
pSummary->cacheTimeUs += (taosGetTimestampUs() - start);
}
}
......@@ -4389,7 +4443,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int64_t cnt = 0;
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
SData ** sdata = pRuntimeEnv->colDataBuffer;
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
int32_t blockLoadStatus = DISK_DATA_LOADED;
......@@ -4414,12 +4467,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t numOfRes = 0;
SBlockInfo blockInfo = {0};
if (IS_DISK_DATA_BLOCK(pQuery)) {
doHandleFileBlockImpl(pRuntimeEnv, &blockInfo, searchFn, sdata, &numOfRes, blockLoadStatus, &forwardStep);
} else {
doHandleCacheBlockImpl(pRuntimeEnv, &blockInfo, searchFn, &numOfRes, &forwardStep);
}
doHandleDataBlockImpl(pRuntimeEnv, &blockInfo, searchFn, &numOfRes, blockLoadStatus, &forwardStep);
dTrace("QInfo:%p check data block, brange:%lld-%lld, fileId:%d, slot:%d, pos:%d, bstatus:%d, rows:%d, checked:%d",
GET_QINFO_ADDR(pQuery), blockInfo.keyFirst, blockInfo.keyLast, pQuery->fileId, pQuery->slot, pQuery->pos,
......@@ -4470,10 +4518,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
// check next block
void *pNextBlock = getGenericDataBlock(pMeterObj, pQuery, pQuery->slot);
void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
blockInfo = getBlockBasicInfo(pNextBlock, blockType);
blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType);
if (!checkQueryRangeAgainstNextBlock(&blockInfo, pRuntimeEnv)) {
break;
}
......@@ -4484,7 +4532,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
static void updatelastkey(SQuery *pQuery, SMeterQueryInfo *pMeterQInfo) { pMeterQInfo->lastKey = pQuery->lastKey; }
void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32_t blockStatus, char *data,
void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32_t blockStatus,
SBlockInfo *pBlockBasicInfo, SMeterDataInfo *pDataHeadInfoEx, SField *pFields,
__block_search_fn_t searchFn) {
/* cache blocks may be assign to other meter, abort */
......@@ -4497,7 +4545,7 @@ void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32
if (pQuery->nAggTimeInterval == 0) { // not interval query
int32_t numOfRes = 0;
applyFunctionsOnBlock(pRuntimeEnv, pBlockBasicInfo, primaryKeys, data, pFields, searchFn, &numOfRes);
applyFunctionsOnBlock(pRuntimeEnv, pBlockBasicInfo, primaryKeys, pFields, searchFn, &numOfRes);
// note: only fixed number of output for each group by operation
if (numOfRes > 0) {
......@@ -4511,7 +4559,7 @@ void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32
}
} else {
applyIntervalQueryOnBlock(pSupporter, pDataHeadInfoEx, data, primaryKeys, pBlockBasicInfo, blockStatus, pFields,
applyIntervalQueryOnBlock(pSupporter, pDataHeadInfoEx, primaryKeys, pBlockBasicInfo, blockStatus, pFields,
searchFn);
}
}
......@@ -6433,7 +6481,7 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int3
}
static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo,
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, char *sdata, SField *pFields,
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
__block_search_fn_t searchFn) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -6444,7 +6492,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
while (1) {
int32_t numOfRes = 0;
int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, sdata, pFields, searchFn, &numOfRes);
int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes);
assert(steps > 0);
// NOTE: in case of stable query, only ONE(or ZERO) row of result generated for each query range
......@@ -6769,7 +6817,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
}
}
SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_FILE_BLOCK);
SBlockInfo binfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK);
bool loadTS = needPrimaryTimestampCol(pQuery, &binfo);
/*
......@@ -7013,9 +7061,9 @@ static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY keyInData, TSKEY
doGetAlignedIntervalQueryRange(pQuery, keyInData, skey, ekey);
}
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data,
int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus,
SField *pFields, __block_search_fn_t searchFn) {
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int64_t *pPrimaryData,
SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields,
__block_search_fn_t searchFn) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterQueryInfo * pInfo = pInfoEx->pMeterQInfo;
......@@ -7036,7 +7084,7 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD
((pBlockInfo->keyFirst > pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) {
int32_t numOfRes = 0;
/* current block is included in this interval */
int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryData, data, pFields, searchFn, &numOfRes);
int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryData, pFields, searchFn, &numOfRes);
assert(numOfRes <= 1 && numOfRes >= 0 && steps > 0);
if (pInfo->lastResRows == 0) {
......@@ -7047,7 +7095,7 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD
saveIntervalQueryRange(pRuntimeEnv, pInfo);
} else {
doApplyIntervalQueryOnBlock(pSupporter, pInfo, pBlockInfo, pPrimaryData, data, pFields, searchFn);
doApplyIntervalQueryOnBlock(pSupporter, pInfo, pBlockInfo, pPrimaryData, pFields, searchFn);
}
}
......
......@@ -146,9 +146,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
if (pQuery->nAggTimeInterval == 0) {
if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
dTrace(
"QInfo:%p vid:%d sid:%d id:%s, query completed, no need to scan data in cache. qrange:%lld-%lld, "
"lastKey:%lld",
dTrace("QInfo:%p vid:%d sid:%d id:%s, query completed, ignore data in cache. qrange:%lld-%lld, lastKey:%lld",
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey,
pQuery->lastKey);
......@@ -183,7 +181,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
// data in this block may be flushed to disk and this block is allocated to other meter
// todo try with remain cache blocks
SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
if (pBlock == NULL) {
continue;
}
......@@ -196,7 +194,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
for (int32_t i = 0; i < pCacheInfo->maxBlocks; ++i) {
pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
/*
* 1. pBlock == NULL. The cache block may be flushed to disk, so it is not available, skip and try next
......@@ -216,8 +214,8 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
setStartPositionForCacheBlock(pQuery, pBlock, &firstCheckSlot);
TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0];
TSKEY* primaryKeys = (TSKEY*) pRuntimeEnv->primaryColBuffer->data;
// in handling file data block, the timestamp range validation is done during fetching candidate file blocks
if ((primaryKeys[pQuery->pos] > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(primaryKeys[pQuery->pos] < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) {
......@@ -226,15 +224,14 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
// only record the key on last block
SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus);
SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK);
SBlockInfo binfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK);
dTrace("QInfo:%p check data block, brange:%lld-%lld, fileId:%d, slot:%d, pos:%d, bstatus:%d",
GET_QINFO_ADDR(pQuery), binfo.keyFirst, binfo.keyLast, pQuery->fileId, pQuery->slot, pQuery->pos,
pRuntimeEnv->blockStatus);
totalBlocks++;
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pBlock, &binfo, &pMeterInfo[k], NULL,
searchFn);
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, &binfo, &pMeterInfo[k], NULL, searchFn);
if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) {
break;
......@@ -425,7 +422,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
continue;
}
SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_FILE_BLOCK);
SBlockInfo binfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK);
assert(pQuery->pos >= 0 && pQuery->pos < pBlock->numOfPoints);
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
......@@ -441,8 +438,8 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
(pBlock->keyFirst >= pQuery->ekey && pBlock->keyLast <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
}
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pRuntimeEnv->colDataBuffer, &binfo,
pOneMeterDataInfo, pInfoEx->pBlock.fields, searchFn);
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, &binfo, pOneMeterDataInfo, pInfoEx->pBlock.fields,
searchFn);
}
tfree(pReqMeterDataInfo);
......@@ -489,6 +486,9 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
pQInfo->pObj = pMeterObj;
pQuery->lastKey = pQuery->skey;
pRuntimeEnv->pMeterObj = pMeterObj;
vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj);
vnodeUpdateFilterColumnIndex(pQuery);
vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, dataInDisk, dataInCache);
......@@ -619,6 +619,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pSupporter->rawEKey = key;
int64_t num = doCheckMetersInGroup(pQInfo, index, start);
if (num == 0) {
int32_t k = 1;
}
assert(num >= 0);
} else {
dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, pOneMeter->vnode,
......@@ -686,7 +689,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return;
}
bool dataInDisk = true;
bool dataInCache = true;
if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, k, 0)) {
......@@ -725,9 +728,6 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
}
}
vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj);
vnodeUpdateFilterColumnIndex(pQuery);
vnodeScanAllData(pRuntimeEnv);
pQuery->pointsRead = getNumOfResult(pRuntimeEnv);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册